diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index bc790418de..f3f6b4de6f 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -273,8 +273,8 @@ private[spark] object KafkaTokenUtil extends Logging { sparkConf: SparkConf, params: ju.Map[String, Object], clusterConfig: Option[KafkaTokenClusterConf]): Boolean = { - if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") && - clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + if (clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG) && + HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka")) { logDebug("Delegation token used by connector, checking if uses the latest token.") val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] getTokenJaasParams(clusterConfig.get) != connectorJaasParams