Added Kafka Wordcount producer

This commit is contained in:
Denny 2012-11-19 10:17:58 -08:00
parent 6757ed6a40
commit 5e2b0a3bf6
2 changed files with 52 additions and 25 deletions

View file

@ -1,5 +1,9 @@
package spark.streaming.examples
import java.util.Properties
import kafka.message.Message
import kafka.producer.SyncProducerConfig
import kafka.producer._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
@ -8,33 +12,57 @@ import spark.streaming.util.RawTextHelper._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <restore>")
if (args.length < 6) {
System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
System.exit(1)
}
val ssc = args(3) match {
// Restore the stream from a checkpoint
case "true" =>
new StreamingContext("work/checkpoint")
case _ =>
val tmp = new StreamingContext(args(0), "KafkaWordCount")
val Array(master, hostname, port, group, topics, numThreads) = args
tmp.setBatchDuration(Seconds(2))
tmp.checkpoint("work/checkpoint", Seconds(10))
val ssc = new StreamingContext(master, "KafkaWordCount")
ssc.checkpoint("checkpoint")
ssc.setBatchDuration(Seconds(2))
val lines = tmp.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1),
Map(KafkaPartitionKey(0,"test","test_group",0) -> 0l))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
wordCounts.persist().checkpoint(Seconds(10))
wordCounts.print()
tmp
}
ssc.start()
}
}
// Produces some random words between 1 and 100.
object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", hostname + ":" + port)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
// Send some messages
while(true) {
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
(1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
}.toArray
println(messages.mkString(","))
val data = new ProducerData[String, String](topic, messages)
producer.send(data)
Thread.sleep(100)
}
}
}

View file

@ -171,8 +171,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
// TODO: Remove Logging
logInfo("Handled message: " + (key, offset).toString)
// logInfo("Handled message: " + (key, offset).toString)
// Keep on handling messages
true