From fa9309001a47a2b87f7a735f964537886ed9bd4c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 5 Jan 2021 21:59:49 -0800 Subject: [PATCH] [SPARK-33635][SS] Adjust the order of check in KafkaTokenUtil.needTokenUpdate to remedy perf regression ### What changes were proposed in this pull request? This PR proposes to adjust the order of check in KafkaTokenUtil.needTokenUpdate, so that short-circuit applies on the non-delegation token cases (insecure + secured without delegation token) and remedies the performance regression heavily. ### Why are the changes needed? There's a serious performance regression between Spark 2.4 vs Spark 3.0 on read path against Kafka data source. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually ran a reproducer (https://github.com/codegorillauk/spark-kafka-read with modification to just count instead of writing to Kafka topic) with measuring the time. > the branch applying the change with adding measurement https://github.com/HeartSaVioR/spark/commits/debug-SPARK-33635-v3.0.1 > the branch only adding measurement https://github.com/HeartSaVioR/spark/commits/debug-original-ver-SPARK-33635-v3.0.1 > the result (before the fix) count: 10280000 Took 41.634007047 secs 21/01/06 13:16:07 INFO KafkaDataConsumer: debug ver. 17-original 21/01/06 13:16:07 INFO KafkaDataConsumer: Total time taken to retrieve: 82118 ms > the result (after the fix) count: 10280000 Took 7.964058475 secs 21/01/06 13:08:22 INFO KafkaDataConsumer: debug ver. 17 21/01/06 13:08:22 INFO KafkaDataConsumer: Total time taken to retrieve: 987 ms Closes #31056 from HeartSaVioR/SPARK-33635. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Dongjoon Hyun --- .../main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index bc790418de..f3f6b4de6f 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -273,8 +273,8 @@ private[spark] object KafkaTokenUtil extends Logging { sparkConf: SparkConf, params: ju.Map[String, Object], clusterConfig: Option[KafkaTokenClusterConf]): Boolean = { - if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") && - clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + if (clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG) && + HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka")) { logDebug("Delegation token used by connector, checking if uses the latest token.") val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] getTokenJaasParams(clusterConfig.get) != connectorJaasParams