---
layout: global
title: Spark Streaming Programming Guide
---
* This will become a table of contents (this text will be scraped).
{:toc}
# Overview
A Spark Streaming application is very similar to a Spark application; it consists of a *driver program* that runs the user's `main` function and continuous executes various *parallel operations* on input streams of data. The main abstraction Spark Streaming provides is a *discretized stream* (DStream), which is a continuous sequence of RDDs (distributed collection of elements) representing a continuous stream of data. DStreams can created from live incoming data (such as data from a socket, Kafka, etc.) or it can be generated by transformation of existing DStreams using parallel operators like map, reduce, and window. The basic processing model is as follows:
(i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides the data into batches. Each batch of data is treated as a RDD, that is a immutable and parallel collection of data. These input data RDDs are automatically persisted in memory (serialized by default) and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively referred to as an InputDStream.
(ii) Data received by InputDStreams are processed processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.
This guide shows some how to start programming with DStreams.
# Initializing Spark Streaming
The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
{% highlight scala %}
new StreamingContext(master, jobName, batchDuration)
{% endhighlight %}
The `master` parameter is either the [Mesos master URL](running-on-mesos.html) (for running on a cluster)or the special "local" string (for local mode) that is used to create a Spark Context. For more information about this please refer to the [Spark programming guide](scala-programming-guide.html). The `jobName` is the name of the streaming job, which is the same as the jobName used in SparkContext. It is used to identify this job in the Mesos web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Starting with something conservative like 5 seconds maybe a good start. See [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion.
This constructor creates a SparkContext object using the given `master` and `jobName` parameters. However, if you already have a SparkContext or you need to create a custom SparkContext by specifying list of JARs, then a StreamingContext can be created from the existing SparkContext, by using
{% highlight scala %}
new StreamingContext(sparkContext, batchDuration)
{% endhighlight %}
# Attaching Input Sources - InputDStreams
The StreamingContext is used to creating InputDStreams from input sources:
{% highlight scala %}
// Assuming ssc is the StreamingContext
ssc.networkStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
{% endhighlight %}
A complete list of input sources is available in the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext). Data received from these sources can be processed using DStream operations, which are explained next.
# DStream Operations
Once an input DStream has been created, you can transform it using _DStream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the DStream by writing data out to an external source.
## Transformations
DStreams support many of the transformations available on normal Spark RDD's:
Transformation | Meaning |
map(func) |
Returns a new DStream formed by passing each element of the source through a function func. |
filter(func) |
Returns a new stream formed by selecting those elements of the source on which func returns true. |
flatMap(func) |
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) |
Similar to map, but runs separately on each partition (block) of the DStream, so func must be of type
Iterator[T] => Iterator[U] when running on an DStream of type T. |
union(otherStream) |
Return a new stream that contains the union of the elements in the source stream and the argument. |
groupByKey([numTasks]) |
When called on a stream of (K, V) pairs, returns a stream of (K, Seq[V]) pairs.
Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
|
reduceByKey(func, [numTasks]) |
When called on a stream of (K, V) pairs, returns a stream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
join(otherStream, [numTasks]) |
When called on streams of type (K, V) and (K, W), returns a stream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) |
When called on DStream of type (K, V) and (K, W), returns a DStream of (K, Seq[V], Seq[W]) tuples. |
reduce(func) |
Returns a new DStream of single-element RDDs by aggregating the elements of the stream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. |
transform(func) |
Returns a new DStream by applying func (a RDD-to-RDD function) to every RDD of the stream. This can be used to do arbitrary RDD operations on the DStream. |
Spark Streaming features windowed computations, which allow you to report statistics over a sliding window of data. All window functions take a
Transformation | Meaning |
window(windowDuration, slideTime) |
Return a new stream which is computed based on windowed batches of the source stream. windowDuration is the width of the window and slideTime is the frequency during which the window is calculated. Both times must be multiples of the batch interval.
|
countByWindow(windowDuration, slideTime) |
Return a sliding count of elements in the stream. windowDuration and slideDuration are exactly as defined in window() .
|
reduceByWindow(func, windowDuration, slideDuration) |
Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. windowDuration and slideDuration are exactly as defined in window() .
|
groupByKeyAndWindow(windowDuration, slideDuration, [numTasks])
|
When called on a stream of (K, V) pairs, returns a stream of (K, Seq[V]) pairs over a sliding window.
Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. windowDuration and slideDuration are exactly as defined in window() .
|
reduceByKeyAndWindow(func, [numTasks]) |
When called on a stream of (K, V) pairs, returns a stream of (K, V) pairs where the values for each key are aggregated using the given reduce function over batches within a sliding window. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
windowDuration and slideDuration are exactly as defined in window() .
|
countByKeyAndWindow([numTasks]) |
When called on a stream of (K, V) pairs, returns a stream of (K, Int) pairs where the values for each key are the count within a sliding window. Like in countByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
windowDuration and slideDuration are exactly as defined in window() .
|
A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#spark.streaming.PairDStreamFunctions).
## Output Operations
When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
Operator | Meaning |
foreach(func) |
The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. |
print() |
Prints first ten elements of every batch of data in a DStream on the driver. |
saveAsObjectFiles(prefix, [suffix]) |
Save this DStream's contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
|
saveAsTextFiles(prefix, [suffix]) |
Save this DStream's contents as a text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) |
Save this DStream's contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
## DStream Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple DStream operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`.
Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the [Performance Tuning](#memory-tuning) section. More information on different persistence levels can be found in [Spark Programming Guide](scala-programming-guide.html#rdd-persistence).
# Starting the Streaming computation
All the above DStream operations are completely lazy, that is, the operations will start executing only after the context is started by using
{% highlight scala %}
ssc.start()
{% endhighlight %}
Conversely, the computation can be stopped by using
{% highlight scala %}
ssc.stop()
{% endhighlight %}
# Example - NetworkWordCount.scala
A good example to start off is the spark.streaming.examples.NetworkWordCount. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in