a975a19f21
- SPARK-1558: Updated custom receiver guide to match it with the new API - SPARK-1504: Added deployment and monitoring subsection to streaming - SPARK-1505: Added migration guide for migrating from 0.9.x and below to Spark 1.0 - Updated various Java streaming examples to use JavaReceiverInputDStream to highlight the API change. - Removed the requirement for cleaner ttl from streaming guide Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #652 from tdas/doc-fix and squashes the following commits: cb4f4b7 [Tathagata Das] Possible fix for flaky graceful shutdown test. ab71f7f [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into doc-fix 8d6ff9b [Tathagata Das] Addded migration guide to Spark Streaming. 7d171df [Tathagata Das] Added reference to JavaReceiverInputStream in examples and streaming guide. 49edd7c [Tathagata Das] Change java doc links to use Java docs. 11528d7 [Tathagata Das] Updated links on index page. ff80970 [Tathagata Das] More updates to streaming guide. 4dc42e9 [Tathagata Das] Added monitoring and other documentation in the streaming guide. 14c6564 [Tathagata Das] Updated custom receiver guide.
222 lines
7.6 KiB
Markdown
222 lines
7.6 KiB
Markdown
---
|
|
layout: global
|
|
title: Spark Streaming Custom Receivers
|
|
---
|
|
|
|
Spark Streaming can receive streaming data from any arbitrary data source beyond
|
|
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
|
|
This requires the developer to implement a *receiver* that is customized for receiving data from
|
|
the concerned data source. This guide walks through the process of implementing a custom receiver
|
|
and using it in a Spark Streaming application.
|
|
|
|
### Implementing a Custom Receiver
|
|
|
|
This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
|
|
A custom receiver must extend this abstract class by implementing two methods
|
|
- `onStart()`: Things to do to start receiving data.
|
|
- `onStop()`: Things to do to stop receiving data.
|
|
|
|
Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
|
|
that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
|
|
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
|
|
should stop receiving data.
|
|
|
|
Once the data is received, that data can be stored inside Spark
|
|
by calling `store(data)`, which is a method provided by the
|
|
[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
|
|
There are number of flavours of `store()` which allow you store the received data
|
|
record-at-a-time or as whole collection of objects / serialized bytes.
|
|
|
|
Any exception in the receiving threads should be caught and handled properly to avoid silent
|
|
failures of the receiver. `restart(<exception>)` will restart the receiver by
|
|
asynchronously calling `onStop()` and then calling `onStart()` after a delay.
|
|
`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, `reportError(<error>)`
|
|
reports a error message to the driver (visible in the logs and UI) without stopping / restarting
|
|
the receiver.
|
|
|
|
The following is a custom receiver that receives a stream of text over a socket. It treats
|
|
'\n' delimited lines in the text stream as records and stores them with Spark. If the receiving thread
|
|
has any error connecting or receiving, the receiver is restarted to make another attempt to connect.
|
|
|
|
<div class="codetabs">
|
|
<div data-lang="scala" markdown="1" >
|
|
|
|
{% highlight scala %}
|
|
|
|
class CustomReceiver(host: String, port: Int)
|
|
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
|
|
|
|
def onStart() {
|
|
// Start the thread that receives data over a connection
|
|
new Thread("Socket Receiver") {
|
|
override def run() { receive() }
|
|
}.start()
|
|
}
|
|
|
|
def onStop() {
|
|
// There is nothing much to do as the thread calling receive()
|
|
// is designed to stop by itself isStopped() returns false
|
|
}
|
|
|
|
/** Create a socket connection and receive data until receiver is stopped */
|
|
private def receive() {
|
|
var socket: Socket = null
|
|
var userInput: String = null
|
|
try {
|
|
// Connect to host:port
|
|
socket = new Socket(host, port)
|
|
|
|
// Until stopped or connection broken continue reading
|
|
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
|
|
userInput = reader.readLine()
|
|
while(!isStopped && userInput != null) {
|
|
store(userInput)
|
|
userInput = reader.readLine()
|
|
}
|
|
reader.close()
|
|
socket.close()
|
|
|
|
// Restart in an attempt to connect again when server is active again
|
|
restart("Trying to connect again")
|
|
} catch {
|
|
case e: java.net.ConnectException =>
|
|
// restart if could not connect to server
|
|
restart("Error connecting to " + host + ":" + port, e)
|
|
case t: Throwable =>
|
|
// restart if there is any other error
|
|
restart("Error receiving data", t)
|
|
}
|
|
}
|
|
}
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
<div data-lang="java" markdown="1">
|
|
|
|
{% highlight java %}
|
|
|
|
public class JavaCustomReceiver extends Receiver<String> {
|
|
|
|
String host = null;
|
|
int port = -1;
|
|
|
|
public JavaCustomReceiver(String host_ , int port_) {
|
|
super(StorageLevel.MEMORY_AND_DISK_2());
|
|
host = host_;
|
|
port = port_;
|
|
}
|
|
|
|
public void onStart() {
|
|
// Start the thread that receives data over a connection
|
|
new Thread() {
|
|
@Override public void run() {
|
|
receive();
|
|
}
|
|
}.start();
|
|
}
|
|
|
|
public void onStop() {
|
|
// There is nothing much to do as the thread calling receive()
|
|
// is designed to stop by itself isStopped() returns false
|
|
}
|
|
|
|
/** Create a socket connection and receive data until receiver is stopped */
|
|
private void receive() {
|
|
Socket socket = null;
|
|
String userInput = null;
|
|
|
|
try {
|
|
// connect to the server
|
|
socket = new Socket(host, port);
|
|
|
|
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
|
|
|
// Until stopped or connection broken continue reading
|
|
while (!isStopped() && (userInput = reader.readLine()) != null) {
|
|
System.out.println("Received data '" + userInput + "'");
|
|
store(userInput);
|
|
}
|
|
reader.close();
|
|
socket.close();
|
|
|
|
// Restart in an attempt to connect again when server is active again
|
|
restart("Trying to connect again");
|
|
} catch(ConnectException ce) {
|
|
// restart if could not connect to server
|
|
restart("Could not connect", ce);
|
|
} catch(Throwable t) {
|
|
// restart if there is any other error
|
|
restart("Error receiving data", t);
|
|
}
|
|
}
|
|
}
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
</div>
|
|
|
|
|
|
### Using the custom receiver in a Spark Streaming application
|
|
|
|
The custom receiver can be used in a Spark Streaming application by using
|
|
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
|
|
input DStream using data received by the instance of custom receiver, as shown below
|
|
|
|
<div class="codetabs">
|
|
<div data-lang="scala" markdown="1" >
|
|
|
|
{% highlight scala %}
|
|
// Assuming ssc is the StreamingContext
|
|
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
|
|
val words = lines.flatMap(_.split(" "))
|
|
...
|
|
{% endhighlight %}
|
|
|
|
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
|
|
|
|
</div>
|
|
<div data-lang="java" markdown="1">
|
|
|
|
{% highlight java %}
|
|
// Assuming ssc is the JavaStreamingContext
|
|
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
|
|
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
|
|
...
|
|
{% endhighlight %}
|
|
|
|
The full source code is in the example [JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java).
|
|
|
|
</div>
|
|
</div>
|
|
|
|
|
|
|
|
### Implementing and Using a Custom Actor-based Receiver
|
|
|
|
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
|
|
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
|
|
trait can be applied on any Akka actor, which allows received data to be stored in Spark using
|
|
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
|
|
|
|
{% highlight scala %}
|
|
class CustomActor extends Actor with ActorHelper {
|
|
def receive = {
|
|
case data: String => store(data)
|
|
}
|
|
}
|
|
{% endhighlight %}
|
|
|
|
And a new input stream can be created with this custom actor as
|
|
|
|
{% highlight scala %}
|
|
// Assuming ssc is the StreamingContext
|
|
val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
|
|
{% endhighlight %}
|
|
|
|
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala)
|
|
for an end-to-end example.
|
|
|
|
|