example for demonstrating ZeroMQ stream

This commit is contained in:
Prashant Sharma 2013-02-19 19:42:14 +05:30
parent f7d3e309cb
commit 8d44480d84
2 changed files with 77 additions and 8 deletions

View file

@ -0,0 +1,70 @@
package spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
import spark.streaming.{ Seconds, StreamingContext }
import spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
* every one second.
*/
object SimpleZeroMQPublisher {
def main(args: Array[String]) = {
if (args.length < 2) {
System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
System.exit(1)
}
val Seq(url, topic) = args.toSeq
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
val messages: Array[String] = Array("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
}
acs.awaitTermination()
}
}
/**
* A sample wordcount with ZeroMQStream stream
*
* Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
* <zeroMQurl> and <topic> describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
* `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and then run the example
* `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
//For this stream, a zeroMQ publisher should be running.
val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
}

View file

@ -163,7 +163,7 @@ class StreamingContext private (
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
*
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
@ -181,9 +181,9 @@ class StreamingContext private (
* @param publisherUrl Url of remote zeromq publisher
* @param zeroMQ topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassManifest](publisherUrl:String,
@ -191,11 +191,11 @@ class StreamingContext private (
bytesToObjects: Seq[Seq[Byte]] Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
@ -500,4 +500,3 @@ object StreamingContext {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}