Update MQTTWordCount.scala
This commit is contained in:
parent
6ec39829e9
commit
dbafa11396
|
@ -30,7 +30,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
|
|||
import org.eclipse.paho.client.mqttv3.MqttTopic
|
||||
|
||||
/**
|
||||
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming"
|
||||
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes
|
||||
* Space separated String Message "hello mqtt demo for spark streaming"
|
||||
*/
|
||||
|
||||
object MQTTPublisher {
|
||||
|
@ -99,13 +100,13 @@ object MQTTWordCount {
|
|||
|
||||
val Seq(master, brokerUrl, topic) = args.toSeq
|
||||
|
||||
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
|
||||
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
|
||||
Seq(System.getenv("SPARK_EXAMPLES_JAR")))
|
||||
val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
|
||||
|
||||
val words = lines.flatMap(x => x.toString.split(" "))
|
||||
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
|
||||
wordCounts.print()
|
||||
|
||||
ssc.start()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue