diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1fcd198685..38b4f78177 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1246,6 +1246,22 @@ dstream.foreachRDD { rdd => } {% endhighlight %} +
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override + public void call(JavaRDD rdd) { + final Connection connection = createNewConnection(); // executed at the driver + rdd.foreach(new VoidFunction() { + @Override + public void call(String record) { + connection.send(record); // executed at the worker + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendRecord(rdd): @@ -1279,6 +1295,23 @@ dstream.foreachRDD { rdd => } {% endhighlight %}
+
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override + public void call(JavaRDD rdd) { + rdd.foreach(new VoidFunction() { + @Override + public void call(String record) { + Connection connection = createNewConnection(); + connection.send(record); + connection.close(); + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendRecord(record): @@ -1309,6 +1342,25 @@ dstream.foreachRDD { rdd => } {% endhighlight %}
+
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override + public void call(JavaRDD rdd) { + rdd.foreachPartition(new VoidFunction>() { + @Override + public void call(Iterator partitionOfRecords) { + Connection connection = createNewConnection(); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + connection.close(); + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendPartition(iter): @@ -1342,6 +1394,26 @@ dstream.foreachRDD { rdd => {% endhighlight %}
+
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override + public void call(JavaRDD rdd) { + rdd.foreachPartition(new VoidFunction>() { + @Override + public void call(Iterator partitionOfRecords) { + // ConnectionPool is a static, lazily initialized pool of connections + Connection connection = ConnectionPool.getConnection(); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + ConnectionPool.returnConnection(connection); // return to the pool for future reuse + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendPartition(iter):