From 71d261ab8fb3e7fb22d2687b8e038129ca766a65 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 8 Jan 2021 20:04:56 +0900 Subject: [PATCH] [SPARK-34032][SS] Add truststore and keystore type config possibility for Kafka delegation token ### What changes were proposed in this pull request? Kafka delegation token is obtained with `AdminClient` where security settings can be set. Keystore and trustrore type however can't be set. In this PR I've added these new configurations. This can be useful when the type is different. A good example is to make Spark FIPS compliant where the default JKS is not accepted. ### Why are the changes needed? Missing configurations. ### Does this PR introduce _any_ user-facing change? Yes, adding 2 additional config parameters. ### How was this patch tested? Existing + modified unit tests + simple Kafka to Kafka app on cluster. Closes #31070 from gaborgsomogyi/SPARK-34032. Authored-by: Gabor Somogyi Signed-off-by: Jungtaek Lim (HeartSaVioR) --- docs/structured-streaming-kafka-integration.md | 17 +++++++++++++++++ .../spark/kafka010/KafkaTokenSparkConf.scala | 6 ++++++ .../apache/spark/kafka010/KafkaTokenUtil.scala | 6 ++++++ .../kafka010/KafkaDelegationTokenTest.scala | 4 ++++ .../kafka010/KafkaTokenSparkConfSuite.scala | 10 ++++++++++ .../spark/kafka010/KafkaTokenUtilSuite.scala | 6 ++++++ 6 files changed, 49 insertions(+) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 5336695478..bf25d46f2e 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -1004,6 +1004,14 @@ Delegation tokens can be obtained from multiple clusters and ${cluster} 3.0.0 + + spark.kafka.clusters.${cluster}.ssl.truststore.type + None + + The file format of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token. + + 3.2.0 + spark.kafka.clusters.${cluster}.ssl.truststore.location None @@ -1021,6 +1029,15 @@ Delegation tokens can be obtained from multiple clusters and ${cluster} 3.0.0 + + spark.kafka.clusters.${cluster}.ssl.keystore.type + None + + The file format of the key store file. This is optional for client. + For further details please see Kafka documentation. Only used to obtain delegation token. + + 3.2.0 + spark.kafka.clusters.${cluster}.ssl.keystore.location None diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala index ed4a6f1e34..21ba7b21ed 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala @@ -31,8 +31,10 @@ private[spark] case class KafkaTokenClusterConf( targetServersRegex: String, securityProtocol: String, kerberosServiceName: String, + trustStoreType: Option[String], trustStoreLocation: Option[String], trustStorePassword: Option[String], + keyStoreType: Option[String], keyStoreLocation: Option[String], keyStorePassword: Option[String], keyPassword: Option[String], @@ -44,8 +46,10 @@ private[spark] case class KafkaTokenClusterConf( s"targetServersRegex=$targetServersRegex, " + s"securityProtocol=$securityProtocol, " + s"kerberosServiceName=$kerberosServiceName, " + + s"trustStoreType=$trustStoreType, " + s"trustStoreLocation=$trustStoreLocation, " + s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + + s"keyStoreType=$keyStoreType, " + s"keyStoreLocation=$keyStoreLocation, " + s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " + @@ -77,8 +81,10 @@ private [kafka010] object KafkaTokenSparkConf extends Logging { DEFAULT_SECURITY_PROTOCOL_CONFIG), sparkClusterConf.getOrElse(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME), + sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), + sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index f3f6b4de6f..a182d3c308 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -162,6 +162,9 @@ private[spark] object KafkaTokenUtil extends Logging { private def setTrustStoreProperties( clusterConf: KafkaTokenClusterConf, properties: ju.Properties): Unit = { + clusterConf.trustStoreType.foreach { truststoreType => + properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreType) + } clusterConf.trustStoreLocation.foreach { truststoreLocation => properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation) } @@ -173,6 +176,9 @@ private[spark] object KafkaTokenUtil extends Logging { private def setKeyStoreProperties( clusterConf: KafkaTokenClusterConf, properties: ju.Properties): Unit = { + clusterConf.keyStoreType.foreach { keystoreType => + properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keystoreType) + } clusterConf.keyStoreLocation.foreach { keystoreLocation => properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation) } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index 19335f4221..8271acdc7d 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -51,8 +51,10 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { protected val bootStrapServers = "127.0.0.1:0" protected val matchingTargetServersRegex = "127.0.0.*:0" protected val nonMatchingTargetServersRegex = "127.0.intentionally_non_matching.*:0" + protected val trustStoreType = "customTrustStoreType" protected val trustStoreLocation = "/path/to/trustStore" protected val trustStorePassword = "trustStoreSecret" + protected val keyStoreType = "customKeyStoreType" protected val keyStoreLocation = "/path/to/keyStore" protected val keyStorePassword = "keyStoreSecret" protected val keyPassword = "keySecret" @@ -124,8 +126,10 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX, securityProtocol, KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME, + Some(trustStoreType), Some(trustStoreLocation), Some(trustStorePassword), + Some(keyStoreType), Some(keyStoreLocation), Some(keyStorePassword), Some(keyPassword), diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala index 61184a6fac..17caf96818 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala @@ -29,8 +29,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { private val targetServersRegex = "127.0.0.*:0" private val securityProtocol = SSL.name private val kerberosServiceName = "kafka1" + private val trustStoreType = "customTrustStoreType" private val trustStoreLocation = "/path/to/trustStore" private val trustStorePassword = "trustStoreSecret" + private val keyStoreType = "customKeyStoreType" private val keyStoreLocation = "/path/to/keyStore" private val keyStorePassword = "keyStoreSecret" private val keyPassword = "keySecret" @@ -60,8 +62,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.securityProtocol === SASL_SSL.name) assert(clusterConfig.kerberosServiceName === KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME) + assert(clusterConfig.trustStoreType === None) assert(clusterConfig.trustStoreLocation === None) assert(clusterConfig.trustStorePassword === None) + assert(clusterConfig.keyStoreType === None) assert(clusterConfig.keyStoreLocation === None) assert(clusterConfig.keyStorePassword === None) assert(clusterConfig.keyPassword === None) @@ -75,8 +79,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { sparkConf.set(s"spark.kafka.clusters.$identifier1.security.protocol", securityProtocol) sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.kerberos.service.name", kerberosServiceName) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.type", trustStoreType) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.location", trustStoreLocation) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.password", trustStorePassword) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.type", keyStoreType) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.location", keyStoreLocation) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.password", keyStorePassword) sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.key.password", keyPassword) @@ -88,8 +94,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.targetServersRegex === targetServersRegex) assert(clusterConfig.securityProtocol === securityProtocol) assert(clusterConfig.kerberosServiceName === kerberosServiceName) + assert(clusterConfig.trustStoreType === Some(trustStoreType)) assert(clusterConfig.trustStoreLocation === Some(trustStoreLocation)) assert(clusterConfig.trustStorePassword === Some(trustStorePassword)) + assert(clusterConfig.keyStoreType === Some(keyStoreType)) assert(clusterConfig.keyStoreLocation === Some(keyStoreLocation)) assert(clusterConfig.keyStorePassword === Some(keyStorePassword)) assert(clusterConfig.keyPassword === Some(keyPassword)) @@ -127,8 +135,10 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { assert(clusterConfig.securityProtocol === SASL_SSL.name) assert(clusterConfig.kerberosServiceName === KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME) + assert(clusterConfig.trustStoreType === None) assert(clusterConfig.trustStoreLocation === None) assert(clusterConfig.trustStorePassword === None) + assert(clusterConfig.keyStoreType === None) assert(clusterConfig.keyStoreLocation === None) assert(clusterConfig.keyStorePassword === None) assert(clusterConfig.keyPassword === None) diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 94f7853003..ca34e14f2c 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -64,8 +64,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SASL_PLAINTEXT.name) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) @@ -80,10 +82,12 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SASL_SSL.name) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) === trustStoreType) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) === trustStoreLocation) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === trustStorePassword) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) @@ -99,10 +103,12 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SSL.name) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) === trustStoreType) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) === trustStoreLocation) assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === trustStorePassword) + assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG) === keyStoreType) assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation) assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword) assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword)