[SPARK-16776][STREAMING] Replace deprecated API in KafkaTestUtils for 0.10.0.
## What changes were proposed in this pull request? This PR replaces the old Kafka API to 0.10.0 ones in `KafkaTestUtils`. The change include: - `Producer` to `KafkaProducer` - Change configurations to equalvant ones. (I referred [here](http://kafka.apache.org/documentation.html#producerconfigs) for 0.10.0 and [here](http://kafka.apache.org/082/documentation.html#producerconfigs ) for old, 0.8.2). This PR will remove the build warning as below: ```scala [WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:71: class Producer in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.KafkaProducer instead. [WARNING] private var producer: Producer[String, String] = _ [WARNING] ^ [WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:181: class Producer in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.KafkaProducer instead. [WARNING] producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) [WARNING] ^ [WARNING] .../spark/streaming/kafka010/KafkaTestUtils.scala:181: class ProducerConfig in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.ProducerConfig instead. [WARNING] producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) [WARNING] ^ [WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:182: class KeyedMessage in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.ProducerRecord instead. [WARNING] producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) [WARNING] ^ [WARNING] four warnings found [WARNING] warning: [options] bootstrap class path not set in conjunction with -source 1.7 [WARNING] 1 warning ``` ## How was this patch tested? Existing tests that use `KafkaTestUtils` should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14416 from HyukjinKwon/SPARK-16776.
This commit is contained in:
parent
1e9b59b73b
commit
f93ad4fe7c
|
@ -30,10 +30,10 @@ import scala.util.control.NonFatal
|
|||
|
||||
import kafka.admin.AdminUtils
|
||||
import kafka.api.Request
|
||||
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
|
||||
import kafka.serializer.StringEncoder
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import kafka.utils.ZkUtils
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
|
@ -68,7 +68,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
|
|||
private var server: KafkaServer = _
|
||||
|
||||
// Kafka producer
|
||||
private var producer: Producer[String, String] = _
|
||||
private var producer: KafkaProducer[String, String] = _
|
||||
|
||||
// Flag to test whether the system is correctly started
|
||||
private var zkReady = false
|
||||
|
@ -178,8 +178,10 @@ private[kafka010] class KafkaTestUtils extends Logging {
|
|||
|
||||
/** Send the array of messages to the Kafka broker */
|
||||
def sendMessages(topic: String, messages: Array[String]): Unit = {
|
||||
producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
|
||||
producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
|
||||
producer = new KafkaProducer[String, String](producerConfiguration)
|
||||
messages.foreach { message =>
|
||||
producer.send(new ProducerRecord[String, String](topic, message))
|
||||
}
|
||||
producer.close()
|
||||
producer = null
|
||||
}
|
||||
|
@ -198,10 +200,12 @@ private[kafka010] class KafkaTestUtils extends Logging {
|
|||
|
||||
private def producerConfiguration: Properties = {
|
||||
val props = new Properties()
|
||||
props.put("metadata.broker.list", brokerAddress)
|
||||
props.put("serializer.class", classOf[StringEncoder].getName)
|
||||
props.put("bootstrap.servers", brokerAddress)
|
||||
props.put("value.serializer", classOf[StringSerializer].getName)
|
||||
// Key serializer is required.
|
||||
props.put("key.serializer", classOf[StringSerializer].getName)
|
||||
// wait for all in-sync replicas to ack sends
|
||||
props.put("request.required.acks", "-1")
|
||||
props.put("acks", "all")
|
||||
props
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue