spark-instrumented-optimizer/docs/streaming-custom-receivers.md

147 lines
4.5 KiB
Markdown
Raw Normal View History

2013-01-22 02:58:29 -05:00
---
layout: global
2013-02-24 19:32:44 -05:00
title: Tutorial - Spark Streaming, Plugging in a custom receiver.
2013-01-22 02:58:29 -05:00
---
2013-02-24 19:32:44 -05:00
A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
2013-01-22 02:58:29 -05:00
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
### Write a simple receiver
2013-01-22 02:58:29 -05:00
This starts with implementing [NetworkReceiver](#References)
2013-01-22 02:58:29 -05:00
Following is a simple socket text-stream receiver.
{% highlight scala %}
class SocketTextStreamReceiver(host: String,
port: Int
) extends NetworkReceiver[String] {
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
{% endhighlight %}
All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
### An Actor as Receiver.
2013-01-22 02:58:29 -05:00
This starts with implementing [Actor](#References)
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
{% highlight scala %}
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
}
{% endhighlight %}
All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
### A sample spark application
* First create a Spark streaming context with master url and batchduration.
{% highlight scala %}
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
{% endhighlight %}
* Plug-in the custom receiver into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
{% endhighlight %}
* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
2013-01-22 02:58:29 -05:00
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
* Process it.
{% highlight scala %}
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
{% endhighlight %}
* After processing it, stream can be tested using the netcat utility.
$ nc -l localhost 8445
hello world
hello hello
## Multiple homogeneous/heterogeneous receivers.
A DStream union operation is provided for taking union on multiple input streams.
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
{% endhighlight %}
Above stream can be easily process as described earlier.
_A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
2013-09-01 01:45:53 -04:00
2.[NetworkReceiver](http://spark.incubator.apache.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver)