[SPARK-28144][SPARK-29294][SS] Upgrade Kafka to 2.4.0

### What changes were proposed in this pull request?

This patch upgrades the version of Kafka to 2.4, which supports Scala 2.13.

There're some incompatible changes in Kafka 2.4 which the patch addresses as well:

* `ZkUtils` is removed -> Replaced with `KafkaZkClient`
* Majority of methods are removed in `AdminUtils` -> Replaced with `AdminZkClient`
* Method signature of `Scheduler.schedule` is changed (return type) -> leverage `DeterministicScheduler` to avoid implementing `ScheduledFuture`

### Why are the changes needed?

* Kafka 2.4 supports Scala 2.13

### Does this PR introduce any user-facing change?

No, as Kafka API is known to be compatible across versions.

### How was this patch tested?

Existing UTs

Closes #26960 from HeartSaVioR/SPARK-29294.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-12-21 14:01:25 -08:00 committed by Dongjoon Hyun
parent fa47b7faf7
commit 8384ff4c9d
6 changed files with 68 additions and 78 deletions

View file

@ -30,9 +30,9 @@ import scala.util.Random
import com.google.common.io.Files import com.google.common.io.Files
import kafka.api.Request import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils import kafka.zk.KafkaZkClient
import org.apache.hadoop.minikdc.MiniKdc import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.CommonClientConfigs
@ -44,6 +44,7 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT} import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.SystemTime
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.zookeeper.server.auth.SASLAuthenticationProvider import org.apache.zookeeper.server.auth.SASLAuthenticationProvider
import org.scalatest.Assertions._ import org.scalatest.Assertions._
@ -81,7 +82,7 @@ class KafkaTestUtils(
private val zkSessionTimeout = 10000 private val zkSessionTimeout = 10000
private var zookeeper: EmbeddedZookeeper = _ private var zookeeper: EmbeddedZookeeper = _
private var zkUtils: ZkUtils = _ private var zkClient: KafkaZkClient = _
// Kafka broker related configurations // Kafka broker related configurations
private val brokerHost = localCanonicalHostName private val brokerHost = localCanonicalHostName
@ -115,9 +116,9 @@ class KafkaTestUtils(
s"$brokerHost:$brokerPort" s"$brokerHost:$brokerPort"
} }
def zookeeperClient: ZkUtils = { def zookeeperClient: KafkaZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(zkUtils).getOrElse( Option(zkClient).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet initialized")) throw new IllegalStateException("Zookeeper client is not yet initialized"))
} }
@ -243,7 +244,8 @@ class KafkaTestUtils(
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port // Get the actual zookeeper binding port
zkPort = zookeeper.actualPort zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout,
zkConnectionTimeout, 1, new SystemTime())
zkReady = true zkReady = true
} }
@ -288,7 +290,7 @@ class KafkaTestUtils(
setupEmbeddedZookeeper() setupEmbeddedZookeeper()
setupEmbeddedKafkaServer() setupEmbeddedKafkaServer()
eventually(timeout(1.minute)) { eventually(timeout(1.minute)) {
assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds") assert(zkClient.getAllBrokersInCluster.nonEmpty, "Broker was not up in 60 seconds")
} }
} }
@ -335,9 +337,9 @@ class KafkaTestUtils(
} }
} }
if (zkUtils != null) { if (zkClient != null) {
zkUtils.close() zkClient.close()
zkUtils = null zkClient = null
} }
if (zookeeper != null) { if (zookeeper != null) {
@ -367,7 +369,7 @@ class KafkaTestUtils(
var created = false var created = false
while (!created) { while (!created) {
try { try {
val newTopic = new NewTopic(topic, partitions, 1) val newTopic = new NewTopic(topic, partitions, 1.shortValue())
adminClient.createTopics(Collections.singleton(newTopic)) adminClient.createTopics(Collections.singleton(newTopic))
created = true created = true
} catch { } catch {
@ -384,7 +386,7 @@ class KafkaTestUtils(
} }
def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = { def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster).mapValues(_.size).toSeq
} }
/** Create a Kafka topic and wait until it is propagated to the whole cluster */ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
@ -394,9 +396,9 @@ class KafkaTestUtils(
/** Delete a Kafka topic and wait until it is propagated to the whole cluster */ /** Delete a Kafka topic and wait until it is propagated to the whole cluster */
def deleteTopic(topic: String): Unit = { def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size val partitions = zkClient.getPartitionsForTopics(Set(topic))(topic).size
adminClient.deleteTopics(Collections.singleton(topic)) adminClient.deleteTopics(Collections.singleton(topic))
verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server)) verifyTopicDeletionWithRetries(topic, partitions, List(this.server))
} }
/** Add new partitions to a Kafka topic */ /** Add new partitions to a Kafka topic */
@ -575,15 +577,12 @@ class KafkaTestUtils(
servers: Seq[KafkaServer]): Unit = { servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion // wait until admin path for delete topic is deleted, signaling completion of topic deletion
assert( assert(!zkClient.isTopicMarkedForDeletion(topic), "topic is still marked for deletion")
!zkUtils.pathExists(getDeleteTopicPath(topic)), assert(!zkClient.topicExists(topic), "topic still exists")
s"${getDeleteTopicPath(topic)} still exists")
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers // ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp => assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp) == None)), server.replicaManager.getPartition(tp) == HostedPartition.None)),
s"topic $topic still exists in the replica manager") s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful // ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp => assert(servers.forall(server => topicAndPartitions.forall(tp =>
@ -598,13 +597,12 @@ class KafkaTestUtils(
}), s"checkpoint for topic $topic still exists") }), s"checkpoint for topic $topic still exists")
// ensure the topic is gone // ensure the topic is gone
assert( assert(
!zkUtils.getAllTopics().contains(topic), !zkClient.getAllTopicsInCluster.contains(topic),
s"topic $topic still exists on zookeeper") s"topic $topic still exists on zookeeper")
} }
/** Verify topic is deleted. Retry to delete the topic if not. */ /** Verify topic is deleted. Retry to delete the topic if not. */
private def verifyTopicDeletionWithRetries( private def verifyTopicDeletionWithRetries(
zkUtils: ZkUtils,
topic: String, topic: String,
numPartitions: Int, numPartitions: Int,
servers: Seq[KafkaServer]): Unit = { servers: Seq[KafkaServer]): Unit = {
@ -626,9 +624,9 @@ class KafkaTestUtils(
def isPropagated = server.dataPlaneRequestProcessor.metadataCache def isPropagated = server.dataPlaneRequestProcessor.metadataCache
.getPartitionInfo(topic, partition) match { .getPartitionInfo(topic, partition) match {
case Some(partitionState) => case Some(partitionState) =>
zkUtils.getLeaderForPartition(topic, partition).isDefined && zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
Request.isValidBrokerId(partitionState.basePartitionState.leader) && Request.isValidBrokerId(partitionState.leader) &&
!partitionState.basePartitionState.replicas.isEmpty !partitionState.replicas.isEmpty
case _ => case _ =>
false false

View file

@ -111,6 +111,11 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId> <artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
</dependency>
<!-- <!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

View file

@ -27,13 +27,14 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import kafka.admin.AdminUtils
import kafka.api.Request import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SystemTime
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.{SparkConf, SparkException}
@ -57,7 +58,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
private var zookeeper: EmbeddedZookeeper = _ private var zookeeper: EmbeddedZookeeper = _
private var zkUtils: ZkUtils = _ private var zkClient: KafkaZkClient = _
private var admClient: AdminZkClient = _
// Kafka broker related configurations // Kafka broker related configurations
private val brokerHost = "127.0.0.1" private val brokerHost = "127.0.0.1"
@ -85,19 +87,27 @@ private[kafka010] class KafkaTestUtils extends Logging {
s"$brokerHost:$brokerPort" s"$brokerHost:$brokerPort"
} }
def zookeeperClient: ZkUtils = { def zookeeperClient: KafkaZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(zkUtils).getOrElse( Option(zkClient).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet initialized")) throw new IllegalStateException("Zookeeper client is not yet initialized"))
} }
def adminClient: AdminZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(admClient).getOrElse(
throw new IllegalStateException("Admin client is not yet initialized"))
}
// Set up the Embedded Zookeeper server and get the proper Zookeeper port // Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = { private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup // Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port // Get the actual zookeeper binding port
zkPort = zookeeper.actualPort zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout,
zkConnectionTimeout, 1, new SystemTime())
admClient = new AdminZkClient(zkClient)
zkReady = true zkReady = true
} }
@ -162,9 +172,9 @@ private[kafka010] class KafkaTestUtils extends Logging {
} }
} }
if (zkUtils != null) { if (zkClient != null) {
zkUtils.close() zkClient.close()
zkUtils = null zkClient = null
} }
if (zookeeper != null) { if (zookeeper != null) {
@ -175,7 +185,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
/** Create a Kafka topic and wait until it is propagated to the whole cluster */ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String, partitions: Int, config: Properties): Unit = { def createTopic(topic: String, partitions: Int, config: Properties): Unit = {
AdminUtils.createTopic(zkUtils, topic, partitions, 1, config) adminClient.createTopic(topic, partitions, 1, config)
// wait until metadata is propagated // wait until metadata is propagated
(0 until partitions).foreach { p => (0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p) waitUntilMetadataIsPropagated(topic, p)
@ -289,9 +299,9 @@ private[kafka010] class KafkaTestUtils extends Logging {
def isPropagated = server.dataPlaneRequestProcessor.metadataCache def isPropagated = server.dataPlaneRequestProcessor.metadataCache
.getPartitionInfo(topic, partition) match { .getPartitionInfo(topic, partition) match {
case Some(partitionState) => case Some(partitionState) =>
val leader = partitionState.basePartitionState.leader val leader = partitionState.leader
val isr = partitionState.basePartitionState.isr val isr = partitionState.isr
zkUtils.getLeaderForPartition(topic, partition).isDefined && zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
Request.isValidBrokerId(leader) && !isr.isEmpty Request.isValidBrokerId(leader) && !isr.isEmpty
case _ => case _ =>
false false

View file

@ -17,12 +17,13 @@
package org.apache.spark.streaming.kafka010.mocks package org.apache.spark.streaming.kafka010.mocks
import java.util.concurrent.TimeUnit import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable.PriorityQueue import scala.collection.mutable.PriorityQueue
import kafka.utils.Scheduler import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.jmock.lib.concurrent.DeterministicScheduler
/** /**
* A mock scheduler that executes tasks synchronously using a mock time instance. * A mock scheduler that executes tasks synchronously using a mock time instance.
@ -41,36 +42,18 @@ import org.apache.kafka.common.utils.Time
*/ */
private[kafka010] class MockScheduler(val time: Time) extends Scheduler { private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
/* a priority queue of tasks ordered by next execution time */ val scheduler = new DeterministicScheduler()
var tasks = new PriorityQueue[MockTask]()
def isStarted: Boolean = true def isStarted: Boolean = true
def startup(): Unit = {} def startup(): Unit = {}
def shutdown(): Unit = synchronized { def shutdown(): Unit = synchronized {
tasks.foreach(_.fun()) scheduler.runUntilIdle()
tasks.clear()
} }
/** def tick(duration: Long, timeUnit: TimeUnit): Unit = synchronized {
* Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs scheduler.tick(duration, timeUnit)
* when this method is called and the execution happens synchronously in the calling thread.
* If you are using the scheduler associated with a MockTime instance this call
* will be triggered automatically.
*/
def tick(): Unit = synchronized {
val now = time.milliseconds
while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
/* pop and execute the task with the lowest next execution time */
val curr = tasks.dequeue
curr.fun()
/* if the task is periodic, reschedule it and re-enqueue */
if(curr.periodic) {
curr.nextExecution += curr.period
this.tasks += curr
}
}
} }
def schedule( def schedule(
@ -78,20 +61,14 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
fun: () => Unit, fun: () => Unit,
delay: Long = 0, delay: Long = 0,
period: Long = -1, period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS): Unit = synchronized { unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized {
tasks += MockTask(name, fun, time.milliseconds + delay, period = period) val runnable = new Runnable {
tick() override def run(): Unit = fun()
} }
if (period >= 0) {
} scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
} else {
case class MockTask( scheduler.schedule(runnable, delay, unit)
val name: String, }
val fun: () => Unit,
var nextExecution: Long,
val period: Long) extends Ordered[MockTask] {
def periodic: Boolean = period >= 0
def compare(t: MockTask): Int = {
java.lang.Long.compare(t.nextExecution, nextExecution)
} }
} }

View file

@ -47,7 +47,7 @@ private[kafka010] class MockTime(@volatile private var currentMs: Long) extends
override def sleep(ms: Long): Unit = { override def sleep(ms: Long): Unit = {
this.currentMs += ms this.currentMs += ms
scheduler.tick() scheduler.tick(ms, TimeUnit.MILLISECONDS)
} }
override def waitObject(obj: Any, condition: Supplier[lang.Boolean], timeoutMs: Long): Unit = override def waitObject(obj: Any, condition: Supplier[lang.Boolean], timeoutMs: Long): Unit =

View file

@ -132,7 +132,7 @@
<!-- Version used for internal directory structure --> <!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short> <hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up --> <!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.3.1</kafka.version> <kafka.version>2.4.0</kafka.version>
<derby.version>10.12.1.1</derby.version> <derby.version>10.12.1.1</derby.version>
<parquet.version>1.10.1</parquet.version> <parquet.version>1.10.1</parquet.version>
<orc.version>1.5.8</orc.version> <orc.version>1.5.8</orc.version>