spark-instrumented-optimizer/docs/streaming-custom-receivers.md

129 lines
4.4 KiB
Markdown
Raw Normal View History

2013-01-22 02:58:29 -05:00
---
layout: global
2013-09-01 02:01:50 -04:00
title: Spark Streaming Custom Receivers
2013-01-22 02:58:29 -05:00
---
2013-02-24 19:32:44 -05:00
A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
2013-01-22 02:58:29 -05:00
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
2013-09-01 02:01:50 -04:00
### Writing a Simple Receiver
2013-01-22 02:58:29 -05:00
2013-09-01 02:01:50 -04:00
This starts with implementing [NetworkReceiver](api/streaming/index.html#org.apache.spark.streaming.dstream.NetworkReceiver).
2013-01-22 02:58:29 -05:00
2013-09-01 02:01:50 -04:00
The following is a simple socket text-stream receiver.
{% highlight scala %}
2013-09-01 02:01:50 -04:00
class SocketTextStreamReceiver(host: String, port: Int(
extends NetworkReceiver[String]
{
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
{% endhighlight %}
All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
2013-09-01 02:01:50 -04:00
### An Actor as Receiver
2013-01-22 02:58:29 -05:00
This starts with implementing [Actor](#References)
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
{% highlight scala %}
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
}
{% endhighlight %}
All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
2013-09-01 02:01:50 -04:00
### A Sample Spark Application
2013-01-22 02:58:29 -05:00
* First create a Spark streaming context with master url and batchduration.
{% highlight scala %}
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
{% endhighlight %}
* Plug-in the custom receiver into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
{% endhighlight %}
* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
2013-01-22 02:58:29 -05:00
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
* Process it.
{% highlight scala %}
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
{% endhighlight %}
* After processing it, stream can be tested using the netcat utility.
$ nc -l localhost 8445
hello world
hello hello
2013-09-01 02:01:50 -04:00
## Multiple Homogeneous/Heterogeneous Receivers.
2013-01-22 02:58:29 -05:00
A DStream union operation is provided for taking union on multiple input streams.
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
{% endhighlight %}
Above stream can be easily process as described earlier.
_A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
2013-09-01 02:01:50 -04:00
2.[NetworkReceiver](api/streaming/index.html#org.apache.spark.streaming.dstream.NetworkReceiver)