diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 47ae7be85c..7b972fede9 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -30,9 +30,9 @@ import scala.util.Random import com.google.common.io.Files import kafka.api.Request -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.{HostedPartition, KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile -import kafka.utils.ZkUtils +import kafka.zk.KafkaZkClient import org.apache.hadoop.minikdc.MiniKdc import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs @@ -44,6 +44,7 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.kafka.common.utils.SystemTime import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.zookeeper.server.auth.SASLAuthenticationProvider import org.scalatest.Assertions._ @@ -81,7 +82,7 @@ class KafkaTestUtils( private val zkSessionTimeout = 10000 private var zookeeper: EmbeddedZookeeper = _ - private var zkUtils: ZkUtils = _ + private var zkClient: KafkaZkClient = _ // Kafka broker related configurations private val brokerHost = localCanonicalHostName @@ -115,9 +116,9 @@ class KafkaTestUtils( s"$brokerHost:$brokerPort" } - def zookeeperClient: ZkUtils = { + def zookeeperClient: KafkaZkClient = { assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") - Option(zkUtils).getOrElse( + Option(zkClient).getOrElse( throw new IllegalStateException("Zookeeper client is not yet initialized")) } @@ -243,7 +244,8 @@ class KafkaTestUtils( zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort - zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) + zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout, + zkConnectionTimeout, 1, new SystemTime()) zkReady = true } @@ -288,7 +290,7 @@ class KafkaTestUtils( setupEmbeddedZookeeper() setupEmbeddedKafkaServer() eventually(timeout(1.minute)) { - assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds") + assert(zkClient.getAllBrokersInCluster.nonEmpty, "Broker was not up in 60 seconds") } } @@ -335,9 +337,9 @@ class KafkaTestUtils( } } - if (zkUtils != null) { - zkUtils.close() - zkUtils = null + if (zkClient != null) { + zkClient.close() + zkClient = null } if (zookeeper != null) { @@ -367,7 +369,7 @@ class KafkaTestUtils( var created = false while (!created) { try { - val newTopic = new NewTopic(topic, partitions, 1) + val newTopic = new NewTopic(topic, partitions, 1.shortValue()) adminClient.createTopics(Collections.singleton(newTopic)) created = true } catch { @@ -384,7 +386,7 @@ class KafkaTestUtils( } def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = { - zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq + zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster).mapValues(_.size).toSeq } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ @@ -394,9 +396,9 @@ class KafkaTestUtils( /** Delete a Kafka topic and wait until it is propagated to the whole cluster */ def deleteTopic(topic: String): Unit = { - val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size + val partitions = zkClient.getPartitionsForTopics(Set(topic))(topic).size adminClient.deleteTopics(Collections.singleton(topic)) - verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server)) + verifyTopicDeletionWithRetries(topic, partitions, List(this.server)) } /** Add new partitions to a Kafka topic */ @@ -575,15 +577,12 @@ class KafkaTestUtils( servers: Seq[KafkaServer]): Unit = { val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) - import ZkUtils._ // wait until admin path for delete topic is deleted, signaling completion of topic deletion - assert( - !zkUtils.pathExists(getDeleteTopicPath(topic)), - s"${getDeleteTopicPath(topic)} still exists") - assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists") + assert(!zkClient.isTopicMarkedForDeletion(topic), "topic is still marked for deletion") + assert(!zkClient.topicExists(topic), "topic still exists") // ensure that the topic-partition has been deleted from all brokers' replica managers assert(servers.forall(server => topicAndPartitions.forall(tp => - server.replicaManager.getPartition(tp) == None)), + server.replicaManager.getPartition(tp) == HostedPartition.None)), s"topic $topic still exists in the replica manager") // ensure that logs from all replicas are deleted if delete topic is marked successful assert(servers.forall(server => topicAndPartitions.forall(tp => @@ -598,13 +597,12 @@ class KafkaTestUtils( }), s"checkpoint for topic $topic still exists") // ensure the topic is gone assert( - !zkUtils.getAllTopics().contains(topic), + !zkClient.getAllTopicsInCluster.contains(topic), s"topic $topic still exists on zookeeper") } /** Verify topic is deleted. Retry to delete the topic if not. */ private def verifyTopicDeletionWithRetries( - zkUtils: ZkUtils, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = { @@ -626,9 +624,9 @@ class KafkaTestUtils( def isPropagated = server.dataPlaneRequestProcessor.metadataCache .getPartitionInfo(topic, partition) match { case Some(partitionState) => - zkUtils.getLeaderForPartition(topic, partition).isDefined && - Request.isValidBrokerId(partitionState.basePartitionState.leader) && - !partitionState.basePartitionState.replicas.isEmpty + zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined && + Request.isValidBrokerId(partitionState.leader) && + !partitionState.replicas.isEmpty case _ => false diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index d11569d709..ad7a8b7e23 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -111,6 +111,11 @@ org.apache.spark spark-tags_${scala.binary.version} + + org.jmock + jmock-junit4 + test + 2.3 - 2.3.1 + 2.4.0 10.12.1.1 1.10.1 1.5.8