[SPARK-33635][SS] Adjust the order of check in KafkaTokenUtil.needTokenUpdate to remedy perf regression

### What changes were proposed in this pull request?

This PR proposes to adjust the order of check in KafkaTokenUtil.needTokenUpdate, so that short-circuit applies on the non-delegation token cases (insecure + secured without delegation token) and remedies the performance regression heavily.

### Why are the changes needed?

There's a serious performance regression between Spark 2.4 vs Spark 3.0 on read path against Kafka data source.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually ran a reproducer (https://github.com/codegorillauk/spark-kafka-read with modification to just count instead of writing to Kafka topic) with measuring the time.

> the branch applying the change with adding measurement

https://github.com/HeartSaVioR/spark/commits/debug-SPARK-33635-v3.0.1

> the branch only adding measurement

https://github.com/HeartSaVioR/spark/commits/debug-original-ver-SPARK-33635-v3.0.1

> the result (before the fix)

count: 10280000
Took 41.634007047 secs

21/01/06 13:16:07 INFO KafkaDataConsumer: debug ver. 17-original
21/01/06 13:16:07 INFO KafkaDataConsumer: Total time taken to retrieve: 82118 ms

> the result (after the fix)

count: 10280000
Took 7.964058475 secs

21/01/06 13:08:22 INFO KafkaDataConsumer: debug ver. 17
21/01/06 13:08:22 INFO KafkaDataConsumer: Total time taken to retrieve: 987 ms

Closes #31056 from HeartSaVioR/SPARK-33635.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2021-01-05 21:59:49 -08:00 committed by Dongjoon Hyun
parent b1c4fc7fc7
commit fa9309001a

View file

@ -273,8 +273,8 @@ private[spark] object KafkaTokenUtil extends Logging {
sparkConf: SparkConf,
params: ju.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
if (clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG) &&
HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka")) {
logDebug("Delegation token used by connector, checking if uses the latest token.")
val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
getTokenJaasParams(clusterConfig.get) != connectorJaasParams