Merge pull request #64 from prabeesh/master

MQTT Adapter for Spark Streaming

MQTT is a machine-to-machine (M2M)/Internet of Things connectivity protocol.
It was designed as an extremely lightweight publish/subscribe messaging transport. You may read more about it here http://mqtt.org/

Message Queue Telemetry Transport (MQTT) is an open message protocol for M2M communications. It enables the transfer of telemetry-style data in the form of messages from devices like sensors and actuators, to mobile phones, embedded systems on vehicles, or laptops and full scale computers.

The protocol was invented by Andy Stanford-Clark of IBM, and Arlen Nipper of Cirrus Link Solutions

This protocol enables a publish/subscribe messaging model in an extremely lightweight way. It is useful for connections with remote locations where line of code and network bandwidth is a constraint.

MQTT is one of the widely used protocol for 'Internet of Things'. This protocol is getting much attraction as anything and everything is getting connected to internet and they all produce data. Researchers and companies predict some 25 billion devices will be connected to the internet by 2015.

Plugin/Support for MQTT is available in popular MQs like RabbitMQ, ActiveMQ etc.

Support for MQTT in Spark will help people with Internet of Things (IoT) projects to use Spark Streaming for their real time data processing needs (from sensors and other embedded devices etc).
This commit is contained in:
Matei Zaharia 2013-10-23 15:07:59 -07:00
commit dd659642e7
5 changed files with 241 additions and 1 deletions

View file

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.MQTTReceiver
import org.apache.spark.storage.StorageLevel
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
/**
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes
* Space separated String Message "hello mqtt demo for spark streaming"
*/
object MQTTPublisher {
var client: MqttClient = _
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
System.exit(1)
}
val Seq(brokerUrl, topic) = args.toSeq
try {
var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
} catch {
case e: MqttException => println("Exception Caught: " + e)
}
client.connect()
val msgtopic: MqttTopic = client.getTopic(topic);
val msg: String = "hello mqtt demo for spark streaming"
while (true) {
val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
msgtopic.publish(message);
println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
client.disconnect()
}
}
/**
* A sample wordcount with MqttStream stream
*
* To work with Mqtt, Mqtt Message broker/server required.
* Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
* In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
* Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
* Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient
* Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
* <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
*
* To run this example locally, you may run publisher as
* `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
* and run the example as
* `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
*/
object MQTTWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" +
" In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val Seq(master, brokerUrl, topic) = args.toSeq
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(x => x.toString.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
}

View file

@ -108,7 +108,10 @@ object SparkBuild extends Build {
// Shared between both core and streaming.
resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
// For Sonatype publishing
// Shared between both examples and streaming.
resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"),
// For Sonatype publishing
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
@ -282,6 +285,7 @@ object SparkBuild extends Build {
"Apache repo" at "https://repository.apache.org/content/repositories/releases"
),
libraryDependencies ++= Seq(
"org.eclipse.paho" % "mqtt-client" % "0.4.0",
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty),

View file

@ -136,6 +136,11 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
<version>0.4.0</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>

View file

@ -462,6 +462,21 @@ class StreamingContext private (
inputStream
}
/**
* Create an input stream that receives messages pushed by a mqtt publisher.
* @param brokerUrl Url of remote mqtt publisher
* @param topic topic name to subscribe to
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def mqttStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
registerInputStream(inputStream)
inputStream
}
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
*/

View file

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.dstream
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
import java.util.Properties
import java.util.concurrent.Executors
import java.io.IOException
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
/**
* Input stream that subscribe messages from a Mqtt Broker.
* Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
* @param brokerUrl Url of remote mqtt publisher
* @param topic topic name to subscribe to
* @param storageLevel RDD storage level.
*/
private[streaming]
class MQTTInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
def onStop() {
blockGenerator.stop()
}
def onStart() {
blockGenerator.start()
// Set up persistence for messages
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
// Connect to MqttBroker
client.connect()
// Subscribe to Mqtt topic
client.subscribe(topic)
// Callback automatically triggers as and when new message arrives on specified topic
var callback: MqttCallback = new MqttCallback() {
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
blockGenerator += new String(arg1.getPayload())
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
logInfo("Connection lost " + arg0)
}
}
// Set up callback for MqttClient
client.setCallback(callback)
}
}