From 12b72b3e73798a5a2cc6c745610e135b1d6825a6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 17 Jan 2013 22:37:56 -0800 Subject: [PATCH] NetworkWordCount example --- .../examples/JavaNetworkWordCount.java | 62 +++++++++++++++++++ .../streaming/examples/NetworkWordCount.scala | 2 +- 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java new file mode 100644 index 0000000000..4299febfd6 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java @@ -0,0 +1,62 @@ +package spark.streaming.examples; + +import com.google.common.collect.Lists; +import scala.Tuple2; +import spark.api.java.function.FlatMapFunction; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; +import spark.streaming.Duration; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` + */ +public class JavaNetworkWordCount { + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: NetworkWordCount \n" + + "In local mode, should be 'local[n]' with n > 1"); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext( + args[0], "NetworkWordCount", new Duration(1000)); + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + JavaDStream lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2])); + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + JavaPairDStream wordCounts = words.map( + new PairFunction() { + @Override + public Tuple2 call(String s) throws Exception { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 43c01d5db2..32f7d57bea 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -22,7 +22,7 @@ object NetworkWordCount { System.exit(1) } - // Create the context and set the batch size + // Create the context with a 1 second batch size val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the