[SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming Programming Guide

Having Python examples in Streaming Programming Guide.

Also add RecoverableNetworkWordCount example.

Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>

Closes #2808 from davies/pyguide and squashes the following commits:

8d4bec4 [Davies Liu] update readme
26a7e37 [Davies Liu] fix format
3821c4d [Davies Liu] address comments, add missing file
7e4bb8a [Davies Liu] add Python examples in Streaming Programming Guide
This commit is contained in:
Davies Liu 2014-10-18 19:14:48 -07:00 committed by Josh Rosen
parent f406a83918
commit 05db2da7dc
5 changed files with 391 additions and 14 deletions

View file

@ -25,8 +25,7 @@ installing via the Ruby Gem dependency manager. Since the exact HTML output
varies between versions of Jekyll and its dependencies, we list specific versions here
in some cases:
$ sudo gem install jekyll -v 1.4.3
$ sudo gem uninstall kramdown -v 1.4.1
$ sudo gem install jekyll
$ sudo gem install jekyll-redirect-from
Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a directory

View file

@ -212,6 +212,67 @@ The complete code can be found in the Spark Streaming example
[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
<br>
</div>
<div data-lang="python" markdown="1" >
First, we import StreamingContext, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.
{% highlight python %}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
{% endhighlight %}
Using this context, we can create a DStream that represents streaming data from a TCP
source hostname, e.g. `localhost`, and port, e.g. `9999`
{% highlight python %}
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}
This `lines` DStream represents the stream of data that will be received from the data
server. Each record in this DStream is a line of text. Next, we want to split the lines by
space into words.
{% highlight python %}
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
{% endhighlight %}
`flatMap` is a one-to-many DStream operation that creates a new DStream by
generating multiple new records from each record in the source DStream. In this case,
each line will be split into multiple words and the stream of words is represented as the
`words` DStream. Next, we want to count these words.
{% highlight python %}
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
{% endhighlight %}
The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
1)` pairs, which is then reduced to get the frequency of words in each batch of data.
Finally, `wordCounts.pprint()` will print a few of the counts generated every second.
Note that when these lines are executed, Spark Streaming only sets up the computation it
will perform when it is started, and no real processing has started yet. To start the processing
after all the transformations have been setup, we finally call
{% highlight python %}
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
{% endhighlight %}
The complete code can be found in the Spark Streaming example
[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/network_wordcount.py).
<br>
</div>
</div>
@ -236,6 +297,11 @@ $ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight bash %}
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
{% endhighlight %}
</div>
</div>
@ -259,8 +325,11 @@ hello world
</td>
<td width="2%"></td>
<td>
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
# TERMINAL 2: RUNNING NetworkWordCount
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
@ -271,6 +340,37 @@ Time: 1357008430000 ms
(world,1)
...
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING JavaNetworkWordCount
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
{% endhighlight %}
</div>
</div>
</td>
</table>
@ -398,9 +498,34 @@ JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
A [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) object can be created from a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object.
{% highlight python %}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
{% endhighlight %}
The `appName` parameter is a name for your application to show on the cluster UI.
`master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
you will not want to hardcode `master` in the program,
but rather [launch the application with `spark-submit`](submitting-applications.html) and
receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
in-process (detects the number of cores in the local system).
The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
section for more details.
</div>
</div>
After a context is defined, you have to do the follow steps.
1. Define the input sources.
1. Setup the streaming computations.
1. Start the receiving and procesing of data using `streamingContext.start()`.
@ -483,6 +608,9 @@ methods for creating DStreams from files and Akka actors as input sources.
<div data-lang="java" markdown="1">
streamingContext.fileStream<keyClass, valueClass, inputFormatClass>(dataDirectory);
</div>
<div data-lang="python" markdown="1">
streamingContext.textFileStream(dataDirectory)
</div>
</div>
Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
@ -684,13 +812,30 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
{% endhighlight %}
This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
1)` pairs in the [earlier example](#a-quick-example)).
{% highlight python %}
runningCounts = pairs.updateStateByKey(updateFunction)
{% endhighlight %}
</div>
</div>
The update function will be called for each word, with `newValues` having a sequence of 1's (from
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
Scala code, take a look at the example
[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala).
[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py).
#### Transform Operation
{:.no_toc}
@ -732,6 +877,15 @@ JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
});
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
{% endhighlight %}
</div>
</div>
@ -793,6 +947,14 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
{% endhighlight %}
</div>
</div>
@ -860,6 +1022,7 @@ see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
***
@ -872,9 +1035,12 @@ Currently, the following output operations are defined:
<table class="table">
<tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr>
<tr>
<td> <b>print</b>() </td>
<td> <b>print</b>()</td>
<td> Prints first ten elements of every batch of data in a DStream on the driver.
This is useful for development and debugging. </td>
This is useful for development and debugging.
<br/>
<b>PS</b>: called <b>pprint</b>() in Python)
</td>
</tr>
<tr>
<td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
@ -915,17 +1081,41 @@ For this purpose, a developer may inadvertantly try creating a connection object
the Spark driver, but try to use it in a Spark worker to save records in the RDDs.
For example (in Scala),
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
dstream.foreachRDD(rdd => {
val connection = createNewConnection() // executed at the driver
rdd.foreach(record => {
connection.send(record) // executed at the worker
})
})
{% endhighlight %}
This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
{% endhighlight %}
</div>
</div>
This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
- However, this can lead to another common mistake - creating a new connection for every record. For example,
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
dstream.foreachRDD(rdd => {
rdd.foreach(record => {
val connection = createNewConnection()
@ -933,9 +1123,28 @@ For example (in Scala),
connection.close()
})
})
{% endhighlight %}
Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection.
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
{% endhighlight %}
</div>
</div>
Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createNewConnection()
@ -943,13 +1152,31 @@ For example (in Scala),
connection.close()
})
})
{% endhighlight %}
</div>
This amortizes the connection creation overheads over many records.
<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
{% endhighlight %}
</div>
</div>
This amortizes the connection creation overheads over many records.
- Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
One can maintain a static pool of connection objects than can be reused as
RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
@ -958,8 +1185,25 @@ For example (in Scala),
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
})
})
{% endhighlight %}
</div>
Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
{% endhighlight %}
</div>
</div>
Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
##### Other points to remember:
@ -1376,6 +1620,44 @@ You can also explicitly create a `JavaStreamingContext` from the checkpoint data
the computation by using `new JavaStreamingContext(checkpointDirectory)`.
</div>
<div data-lang="python" markdown="1">
This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
{% highlight python %}
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...
# Start the context
context.start()
context.awaitTermination()
{% endhighlight %}
If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
If the directory does not exist (i.e., running for the first time),
then the function `functionToCreateContext` will be called to create a new
context and set up the DStreams. See the Python example
[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
This example appends the word counts of network data into a file.
You can also explicitly create a `StreamingContext` from the checkpoint data and start the
computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
</div>
</div>
**Note**: If Spark Streaming and/or the Spark Streaming program is recompiled,
@ -1572,7 +1854,11 @@ package and renamed for better clarity.
[TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html),
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
- Python docs
* [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
* [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
and [Python] ({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming)
* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.

View file

@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Counts words in text encoded with UTF8 received from the network every second.
Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
<output-file> file to which the word counts will be appended
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \
localhost 9999 ~/checkpoint/ ~/out`
If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
the checkpoint data.
"""
import os
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
print "Creating new context"
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
ssc = StreamingContext(sc, 1)
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
print counts
print "Appending to " + os.path.abspath(outputPath)
with open(outputPath, 'a') as f:
f.write(counts + "\n")
wordCounts.foreachRDD(echo)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 5:
print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
"<checkpoint-directory> <output-file>"
exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
lambda: createContext(host, int(port), output))
ssc.start()
ssc.awaitTermination()

View file

@ -0,0 +1,10 @@
pyspark.streaming module
==================
Module contents
---------------
.. automodule:: pyspark.streaming
:members:
:undoc-members:
:show-inheritance:

View file

@ -441,9 +441,11 @@ class DStream(object):
if `invReduceFunc` is not None, the reduction is done incrementally
using the old window's reduced value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
@param reduceFunc: associative reduce function
@param invReduceFunc: inverse reduce function of `reduceFunc`