187a3d5385
This improves the Spark Streaming Guides by fixing broken links, rewording confusing sections, fixing typos, adding missing words, etc.
Author: Mike Dusenberry <dusenberrymw@gmail.com>
Closes #6801 from dusenberrymw/SPARK-8343_Improve_Spark_Streaming_Guides_MERGED and squashes the following commits:
6688090 [Mike Dusenberry] Improvements to the Spark Streaming Custom Receiver Guide, including slight rewording of confusing sections, and fixing typos & missing words.
436fbd8 [Mike Dusenberry] Bunch of improvements to the Spark Streaming Guide, including fixing broken links, slight rewording of confusing sections, fixing typos & missing words, etc.
(cherry picked from commit 35d1267cf8
)
Signed-off-by: Reynold Xin <rxin@databricks.com>
282 lines
11 KiB
Markdown
282 lines
11 KiB
Markdown
---
|
|
layout: global
|
|
title: Spark Streaming Custom Receivers
|
|
---
|
|
|
|
Spark Streaming can receive streaming data from any arbitrary data source beyond
|
|
the ones for which it has built-in support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
|
|
This requires the developer to implement a *receiver* that is customized for receiving data from
|
|
the concerned data source. This guide walks through the process of implementing a custom receiver
|
|
and using it in a Spark Streaming application. Note that custom receivers can be implemented
|
|
in Scala or Java.
|
|
|
|
## Implementing a Custom Receiver
|
|
|
|
This starts with implementing a **Receiver**
|
|
([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),
|
|
[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)).
|
|
A custom receiver must extend this abstract class by implementing two methods
|
|
|
|
- `onStart()`: Things to do to start receiving data.
|
|
- `onStop()`: Things to do to stop receiving data.
|
|
|
|
Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads
|
|
that are responsible for receiving the data, and `onStop()` would ensure that these threads receiving the data
|
|
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
|
|
should stop receiving data.
|
|
|
|
Once the data is received, that data can be stored inside Spark
|
|
by calling `store(data)`, which is a method provided by the Receiver class.
|
|
There are a number of flavors of `store()` which allow one to store the received data
|
|
record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavor of
|
|
`store()` used to implement a receiver affects its reliability and fault-tolerance semantics.
|
|
This is discussed [later](#receiver-reliability) in more detail.
|
|
|
|
Any exception in the receiving threads should be caught and handled properly to avoid silent
|
|
failures of the receiver. `restart(<exception>)` will restart the receiver by
|
|
asynchronously calling `onStop()` and then calling `onStart()` after a delay.
|
|
`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, `reportError(<error>)`
|
|
reports a error message to the driver (visible in the logs and UI) without stopping / restarting
|
|
the receiver.
|
|
|
|
The following is a custom receiver that receives a stream of text over a socket. It treats
|
|
'\n' delimited lines in the text stream as records and stores them with Spark. If the receiving thread
|
|
has any error connecting or receiving, the receiver is restarted to make another attempt to connect.
|
|
|
|
<div class="codetabs">
|
|
<div data-lang="scala" markdown="1" >
|
|
|
|
{% highlight scala %}
|
|
|
|
class CustomReceiver(host: String, port: Int)
|
|
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
|
|
|
|
def onStart() {
|
|
// Start the thread that receives data over a connection
|
|
new Thread("Socket Receiver") {
|
|
override def run() { receive() }
|
|
}.start()
|
|
}
|
|
|
|
def onStop() {
|
|
// There is nothing much to do as the thread calling receive()
|
|
// is designed to stop by itself if isStopped() returns false
|
|
}
|
|
|
|
/** Create a socket connection and receive data until receiver is stopped */
|
|
private def receive() {
|
|
var socket: Socket = null
|
|
var userInput: String = null
|
|
try {
|
|
// Connect to host:port
|
|
socket = new Socket(host, port)
|
|
|
|
// Until stopped or connection broken continue reading
|
|
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
|
|
userInput = reader.readLine()
|
|
while(!isStopped && userInput != null) {
|
|
store(userInput)
|
|
userInput = reader.readLine()
|
|
}
|
|
reader.close()
|
|
socket.close()
|
|
|
|
// Restart in an attempt to connect again when server is active again
|
|
restart("Trying to connect again")
|
|
} catch {
|
|
case e: java.net.ConnectException =>
|
|
// restart if could not connect to server
|
|
restart("Error connecting to " + host + ":" + port, e)
|
|
case t: Throwable =>
|
|
// restart if there is any other error
|
|
restart("Error receiving data", t)
|
|
}
|
|
}
|
|
}
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
<div data-lang="java" markdown="1">
|
|
|
|
{% highlight java %}
|
|
|
|
public class JavaCustomReceiver extends Receiver<String> {
|
|
|
|
String host = null;
|
|
int port = -1;
|
|
|
|
public JavaCustomReceiver(String host_ , int port_) {
|
|
super(StorageLevel.MEMORY_AND_DISK_2());
|
|
host = host_;
|
|
port = port_;
|
|
}
|
|
|
|
public void onStart() {
|
|
// Start the thread that receives data over a connection
|
|
new Thread() {
|
|
@Override public void run() {
|
|
receive();
|
|
}
|
|
}.start();
|
|
}
|
|
|
|
public void onStop() {
|
|
// There is nothing much to do as the thread calling receive()
|
|
// is designed to stop by itself if isStopped() returns false
|
|
}
|
|
|
|
/** Create a socket connection and receive data until receiver is stopped */
|
|
private void receive() {
|
|
Socket socket = null;
|
|
String userInput = null;
|
|
|
|
try {
|
|
// connect to the server
|
|
socket = new Socket(host, port);
|
|
|
|
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
|
|
|
// Until stopped or connection broken continue reading
|
|
while (!isStopped() && (userInput = reader.readLine()) != null) {
|
|
System.out.println("Received data '" + userInput + "'");
|
|
store(userInput);
|
|
}
|
|
reader.close();
|
|
socket.close();
|
|
|
|
// Restart in an attempt to connect again when server is active again
|
|
restart("Trying to connect again");
|
|
} catch(ConnectException ce) {
|
|
// restart if could not connect to server
|
|
restart("Could not connect", ce);
|
|
} catch(Throwable t) {
|
|
// restart if there is any other error
|
|
restart("Error receiving data", t);
|
|
}
|
|
}
|
|
}
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
</div>
|
|
|
|
|
|
## Using the custom receiver in a Spark Streaming application
|
|
|
|
The custom receiver can be used in a Spark Streaming application by using
|
|
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
|
|
an input DStream using data received by the instance of custom receiver, as shown below:
|
|
|
|
<div class="codetabs">
|
|
<div data-lang="scala" markdown="1" >
|
|
|
|
{% highlight scala %}
|
|
// Assuming ssc is the StreamingContext
|
|
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
|
|
val words = lines.flatMap(_.split(" "))
|
|
...
|
|
{% endhighlight %}
|
|
|
|
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).
|
|
|
|
</div>
|
|
<div data-lang="java" markdown="1">
|
|
|
|
{% highlight java %}
|
|
// Assuming ssc is the JavaStreamingContext
|
|
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
|
|
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
|
|
...
|
|
{% endhighlight %}
|
|
|
|
The full source code is in the example [JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java).
|
|
|
|
</div>
|
|
</div>
|
|
|
|
## Receiver Reliability
|
|
As discussed in brief in the
|
|
[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability),
|
|
there are two kinds of receivers based on their reliability and fault-tolerance semantics.
|
|
|
|
1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged, a
|
|
*reliable receiver* correctly acknowledges to the source that the data has been received
|
|
and stored in Spark reliably (that is, replicated successfully). Usually,
|
|
implementing this receiver involves careful consideration of the semantics of source
|
|
acknowledgements.
|
|
1. *Unreliable Receiver* - An *unreliable receiver* does *not* send acknowledgement to a source. This can be used for sources that do not support acknowledgement, or even for reliable sources when one does not want or need to go into the complexity of acknowledgement.
|
|
|
|
To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data.
|
|
This flavor of `store` is a blocking call which returns only after all the given records have
|
|
been stored inside Spark. If the receiver's configured storage level uses replication
|
|
(enabled by default), then this call returns after replication has completed.
|
|
Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the
|
|
source appropriately. This ensures that no data is lost when the receiver fails in the middle
|
|
of replicating data -- the buffered data will not be acknowledged and hence will be later resent
|
|
by the source.
|
|
|
|
An *unreliable receiver* does not have to implement any of this logic. It can simply receive
|
|
records from the source and insert them one-at-a-time using `store(single-record)`. While it does
|
|
not get the reliability guarantees of `store(multiple-records)`, it has the following advantages:
|
|
|
|
- The system takes care of chunking that data into appropriate sized blocks (look for block
|
|
interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)).
|
|
- The system takes care of controlling the receiving rates if the rate limits have been specified.
|
|
- Because of these two, unreliable receivers are simpler to implement than reliable receivers.
|
|
|
|
The following table summarizes the characteristics of both types of receivers
|
|
|
|
<table class="table">
|
|
<tr>
|
|
<th>Receiver Type</th>
|
|
<th>Characteristics</th>
|
|
</tr>
|
|
<tr>
|
|
<td><b>Unreliable Receivers</b></td>
|
|
<td>
|
|
Simple to implement.<br>
|
|
System takes care of block generation and rate control.
|
|
No fault-tolerance guarantees, can lose data on receiver failure.
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td><b>Reliable Receivers</b></td>
|
|
<td>
|
|
Strong fault-tolerance guarantees, can ensure zero data loss.<br/>
|
|
Block generation and rate control to be handled by the receiver implementation.<br/>
|
|
Implementation complexity depends on the acknowledgement mechanisms of the source.
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td></td>
|
|
<td></td>
|
|
</tr>
|
|
</table>
|
|
|
|
## Implementing and Using a Custom Actor-based Receiver
|
|
|
|
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
|
|
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
|
|
trait can be applied on any Akka actor, which allows received data to be stored in Spark using
|
|
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
|
|
|
|
{% highlight scala %}
|
|
class CustomActor extends Actor with ActorHelper {
|
|
def receive = {
|
|
case data: String => store(data)
|
|
}
|
|
}
|
|
{% endhighlight %}
|
|
|
|
And a new input stream can be created with this custom actor as
|
|
|
|
{% highlight scala %}
|
|
// Assuming ssc is the StreamingContext
|
|
val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
|
|
{% endhighlight %}
|
|
|
|
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
|
|
for an end-to-end example.
|