[SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka
## What changes were proposed in this pull request? Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils. This is making a roughly equivalent fix for the 0.8 connector ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <cody@koeninger.org> Closes #14018 from koeninger/kafka-0-8-test-port.
This commit is contained in:
parent
16a2a7d714
commit
1fca9da95d
|
@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder
|
|||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import kafka.utils.{ZKStringSerializer, ZkUtils}
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.apache.commons.lang3.RandomUtils
|
||||
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
|
@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging {
|
|||
|
||||
// Kafka broker related configurations
|
||||
private val brokerHost = "localhost"
|
||||
private var brokerPort = 9092
|
||||
// 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port
|
||||
private var brokerPort = RandomUtils.nextInt(1024, 65536)
|
||||
private var brokerConf: KafkaConfig = _
|
||||
|
||||
// Kafka broker server
|
||||
|
@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging {
|
|||
brokerConf = new KafkaConfig(brokerConfiguration)
|
||||
server = new KafkaServer(brokerConf)
|
||||
server.startup()
|
||||
(server, port)
|
||||
(server, brokerPort)
|
||||
}, new SparkConf(), "KafkaBroker")
|
||||
|
||||
brokerReady = true
|
||||
|
|
Loading…
Reference in a new issue