From 9de1c3abf90fff82901c1ee13297d436d2d7a25d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 27 Aug 2012 00:57:00 +0000 Subject: [PATCH 1/2] Tweaks to WordCount2 --- .../spark/streaming/examples/WordCount2.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index 83cbd31283..87b62817ea 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -70,23 +70,30 @@ object WordCount2 { def main (args: Array[String]) { - if (args.length < 2) { - println ("Usage: SparkStreamContext ") + if (args.length < 4) { + println ("Usage: SparkStreamContext ") System.exit(1) } + + val Array(master, file, mapTasks, reduceTasks) = args - val ssc = new SparkStreamContext(args(0), "WordCount2") + val ssc = new SparkStreamContext(master, "WordCount2") ssc.setBatchDuration(Seconds(1)) + + val data = ssc.sc.textFile(file, mapTasks.toInt).persist(StorageLevel.MEMORY_ONLY_DESER_2) + println("Data count: " + data.count()) + println("Data count: " + data.count()) + println("Data count: " + data.count()) - val sentences = new ConstantInputDStream(ssc, ssc.sc.textFile(args(1)).cache()) + val sentences = new ConstantInputDStream(ssc, data) ssc.inputStreams += sentences import WordCount2_ExtraFunctions._ val windowedCounts = sentences .mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10) - windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(10)) + .reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), reduceTasks.toInt) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10)) windowedCounts.print() ssc.start() From e2cf197a0a878f9c942dafe98a70bdaefb5df58d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 27 Aug 2012 03:34:15 +0000 Subject: [PATCH 2/2] Made WordCount2 even more configurable --- .../scala/spark/streaming/examples/WordCount2.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index 87b62817ea..1afe87e723 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -70,15 +70,17 @@ object WordCount2 { def main (args: Array[String]) { - if (args.length < 4) { - println ("Usage: SparkStreamContext ") + if (args.length != 5) { + println ("Usage: SparkStreamContext ") System.exit(1) } - val Array(master, file, mapTasks, reduceTasks) = args + val Array(master, file, mapTasks, reduceTasks, batchMillis) = args + + val BATCH_DURATION = Milliseconds(batchMillis.toLong) val ssc = new SparkStreamContext(master, "WordCount2") - ssc.setBatchDuration(Seconds(1)) + ssc.setBatchDuration(BATCH_DURATION) val data = ssc.sc.textFile(file, mapTasks.toInt).persist(StorageLevel.MEMORY_ONLY_DESER_2) println("Data count: " + data.count()) @@ -92,7 +94,7 @@ object WordCount2 { val windowedCounts = sentences .mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), reduceTasks.toInt) + .reduceByKeyAndWindow(add _, subtract _, Seconds(10), BATCH_DURATION, reduceTasks.toInt) windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10)) windowedCounts.print()