[SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient configurations.

## What changes were proposed in this pull request?

At the moment Kafka delegation tokens are fetched through `AdminClient` but there is no possibility to add custom configuration parameters. In [options](https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html#kafka-specific-configurations) there is already a possibility to add custom configurations.

In this PR I've added similar this possibility to `AdminClient`.

## How was this patch tested?

Existing + added unit tests.

```
cd docs/
SKIP_API=1 jekyll build
```
Manual webpage check.

Closes #24875 from gaborgsomogyi/SPARK-28055.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Gabor Somogyi 2019-07-11 09:36:24 -07:00 committed by Marcelo Vanzin
parent d1ef6be4c3
commit d47c219f94
6 changed files with 50 additions and 8 deletions

View file

@ -818,6 +818,11 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
</tr>
</table>
#### Kafka Specific Configurations
Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`.
For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs).
#### Caveats
- Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)).

View file

@ -23,6 +23,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
private[spark] case class KafkaTokenClusterConf(
identifier: String,
@ -35,7 +36,8 @@ private[spark] case class KafkaTokenClusterConf(
keyStoreLocation: Option[String],
keyStorePassword: Option[String],
keyPassword: Option[String],
tokenMechanism: String) {
tokenMechanism: String,
specifiedKafkaParams: Map[String, String]) {
override def toString: String = s"KafkaTokenClusterConf{" +
s"identifier=$identifier, " +
s"authBootstrapServers=$authBootstrapServers, " +
@ -43,11 +45,12 @@ private[spark] case class KafkaTokenClusterConf(
s"securityProtocol=$securityProtocol, " +
s"kerberosServiceName=$kerberosServiceName, " +
s"trustStoreLocation=$trustStoreLocation, " +
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"trustStorePassword=${trustStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"keyStoreLocation=$keyStoreLocation, " +
s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"keyPassword=${keyPassword.map(_ => "xxx")}, " +
s"tokenMechanism=$tokenMechanism}"
s"keyStorePassword=${keyStorePassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"keyPassword=${keyPassword.map(_ => REDACTION_REPLACEMENT_TEXT)}, " +
s"tokenMechanism=$tokenMechanism, " +
s"specifiedKafkaParams=${KafkaRedactionUtil.redactParams(specifiedKafkaParams.toSeq)}}"
}
private [kafka010] object KafkaTokenSparkConf extends Logging {
@ -59,6 +62,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = {
val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier."
val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap
val configKafkaPrefix = s"${configPrefix}kafka."
val sparkClusterKafkaConf = sparkConf.getAllWithPrefix(configKafkaPrefix).toMap
val result = KafkaTokenClusterConf(
identifier,
sparkClusterConf
@ -76,7 +81,8 @@ private [kafka010] object KafkaTokenSparkConf extends Logging {
sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
sparkClusterConf.getOrElse("sasl.token.mechanism",
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM),
sparkClusterKafkaConf
)
logDebug(s"getClusterConfig($identifier): $result")
result

View file

@ -134,6 +134,16 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
logDebug("AdminClient params before specified params: " +
s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")
clusterConf.specifiedKafkaParams.foreach { param =>
adminClientProperties.setProperty(param._1, param._2)
}
logDebug("AdminClient params after specified params: " +
s"${KafkaRedactionUtil.redactParams(adminClientProperties.asScala.toSeq)}")
adminClientProperties
}

View file

@ -107,7 +107,8 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
protected def createClusterConf(
identifier: String,
securityProtocol: String): KafkaTokenClusterConf = {
securityProtocol: String,
specifiedKafkaParams: Map[String, String] = Map.empty): KafkaTokenClusterConf = {
KafkaTokenClusterConf(
identifier,
bootStrapServers,
@ -119,6 +120,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
Some(keyStoreLocation),
Some(keyStorePassword),
Some(keyPassword),
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM,
specifiedKafkaParams)
}
}

View file

@ -96,6 +96,16 @@ class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(clusterConfig.tokenMechanism === tokenMechanism)
}
test("getClusterConfig should return specified kafka params") {
sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers)
sparkConf.set(s"spark.kafka.clusters.$identifier1.kafka.customKey", "customValue")
val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1)
assert(clusterConfig.identifier === identifier1)
assert(clusterConfig.authBootstrapServers === authBootStrapServers)
assert(clusterConfig.specifiedKafkaParams === Map("customKey" -> "customValue"))
}
test("getAllClusterConfigs should return empty list when nothing configured") {
assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty)
}

View file

@ -155,6 +155,15 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
assert(saslJaasConfig.contains("useTicketCache=true"))
}
test("createAdminClientProperties with specified params should include it") {
val clusterConf = createClusterConf(identifier1, SASL_SSL.name,
Map("customKey" -> "customValue"))
val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
assert(adminClientProperties.get("customKey") === "customValue")
}
test("isGlobalJaasConfigurationProvided without global config should return false") {
assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided)
}