[SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to use new Producer API
Otherwise it will throw exception: ``` Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:77) at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96) at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: jerryshao <saisai.shao@intel.com> Closes #5936 from jerryshao/SPARK-7396 and squashes the following commits: 270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue
This commit is contained in:
parent
4e930420c1
commit
316a5c0423
|
@ -17,9 +17,9 @@
|
|||
|
||||
package org.apache.spark.examples.streaming
|
||||
|
||||
import java.util.Properties
|
||||
import java.util.HashMap
|
||||
|
||||
import kafka.producer._
|
||||
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
|
||||
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.kafka._
|
||||
|
@ -77,23 +77,25 @@ object KafkaWordCountProducer {
|
|||
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
|
||||
|
||||
// Zookeeper connection properties
|
||||
val props = new Properties()
|
||||
props.put("metadata.broker.list", brokers)
|
||||
props.put("serializer.class", "kafka.serializer.StringEncoder")
|
||||
val props = new HashMap[String, Object]()
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||
"org.apache.kafka.common.serialization.StringSerializer")
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||
"org.apache.kafka.common.serialization.StringSerializer")
|
||||
|
||||
val config = new ProducerConfig(props)
|
||||
val producer = new Producer[String, String](config)
|
||||
val producer = new KafkaProducer[String, String](props)
|
||||
|
||||
// Send some messages
|
||||
while(true) {
|
||||
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
|
||||
(1 to messagesPerSec.toInt).foreach { messageNum =>
|
||||
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
|
||||
.mkString(" ")
|
||||
|
||||
new KeyedMessage[String, String](topic, str)
|
||||
}.toArray
|
||||
val message = new ProducerRecord[String, String](topic, null, str)
|
||||
producer.send(message)
|
||||
}
|
||||
|
||||
producer.send(messages: _*)
|
||||
Thread.sleep(100)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue