spark-instrumented-optimizer/docs/streaming-custom-receivers.md
Matei Zaharia fc78384704 [SPARK-1439, SPARK-1440] Generate unified Scaladoc across projects and Javadocs
I used the sbt-unidoc plugin (https://github.com/sbt/sbt-unidoc) to create a unified Scaladoc of our public packages, and generate Javadocs as well. One limitation is that I haven't found an easy way to exclude packages in the Javadoc; there is a SBT task that identifies Java sources to run javadoc on, but it's been very difficult to modify it from outside to change what is set in the unidoc package. Some SBT-savvy people should help with this. The Javadoc site also lacks package-level descriptions and things like that, so we may want to look into that. We may decide not to post these right now if it's too limited compared to the Scala one.

Example of the built doc site: http://people.csail.mit.edu/matei/spark-unified-docs/

Author: Matei Zaharia <matei@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Patrick Wendell <pwendell@gmail.com>

Closes #457 from mateiz/better-docs and squashes the following commits:

a63d4a3 [Matei Zaharia] Skip Java/Scala API docs for Python package
5ea1f43 [Matei Zaharia] Fix links to Java classes in Java guide, fix some JS for scrolling to anchors on page load
f05abc0 [Matei Zaharia] Don't include java.lang package names
995e992 [Matei Zaharia] Skip internal packages and class names with $ in JavaDoc
a14a93c [Matei Zaharia] typo
76ce64d [Matei Zaharia] Add groups to Javadoc index page, and a first package-info.java
ed6f994 [Matei Zaharia] Generate JavaDoc as well, add titles, update doc site to use unified docs
acb993d [Matei Zaharia] Add Unidoc plugin for the projects we want Unidoced
2014-04-21 21:57:40 -07:00

4.4 KiB

layout title
global Spark Streaming Custom Receivers

A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.

This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.

Writing a Simple Receiver

This starts with implementing NetworkReceiver.

The following is a simple socket text-stream receiver.

{% highlight scala %} class SocketTextStreamReceiver(host: String, port: Int) extends NetworkReceiver[String] { protected lazy val blocksGenerator: BlockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)

     protected def onStart() = {
       blocksGenerator.start()
       val socket = new Socket(host, port)
       val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
       var data: String = dataInputStream.readLine()
       while (data != null) {
         blocksGenerator += data
         data = dataInputStream.readLine()
       }
     }

     protected def onStop() {
       blocksGenerator.stop()
     }
   }

{% endhighlight %}

All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.

An Actor as Receiver

This starts with implementing Actor

Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.

{% highlight scala %} class SocketTextStreamReceiver (host:String, port:Int, bytesToString: ByteString => String) extends Actor with Receiver {

      override def preStart = IOManager(context.system).connect(host, port)

      def receive = {
       case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
     }

   }

{% endhighlight %}

All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.

A Sample Spark Application

  • First create a Spark streaming context with master url and batchduration.

{% highlight scala %} val ssc = new StreamingContext(master, "WordCountCustomStreamSource", Seconds(batchDuration)) {% endhighlight %}

  • Plug-in the custom receiver into the spark streaming context and create a DStream.

{% highlight scala %} val lines = ssc.networkStream[String](new SocketTextStreamReceiver( "localhost", 8445)) {% endhighlight %}

  • OR Plug-in the actor as receiver into the spark streaming context and create a DStream.

{% highlight scala %} val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") {% endhighlight %}

  • Process it.

{% highlight scala %} val words = lines.flatMap(.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey( + _)

wordCounts.print()
ssc.start()

{% endhighlight %}

  • After processing it, stream can be tested using the netcat utility.

    $ nc -l localhost 8445 hello world hello hello

Multiple Homogeneous/Heterogeneous Receivers.

A DStream union operation is provided for taking union on multiple input streams.

{% highlight scala %} val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver")

// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
  "localhost",8446, z => z.utf8String)),"SocketReceiver")

val union = lines.union(lines2)

{% endhighlight %}

Above stream can be easily process as described earlier.

A more comprehensive example is provided in the spark streaming examples

References

1.Akka Actor documentation 2.NetworkReceiver