From bf993cda632e4a9fa41035845491ae466d1a4431 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 1 Sep 2012 19:59:23 +0000 Subject: [PATCH] Make batch size configurable in RawCount --- .../scala/spark/streaming/examples/CountRaw.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala index 17d1ce3602..c78c1e9660 100644 --- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala @@ -7,16 +7,16 @@ import spark.streaming.StreamingContext._ object CountRaw { def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: WordCountNetwork ") + if (args.length != 5) { + System.err.println("Usage: CountRaw ") System.exit(1) } - val Array(master, IntParam(numStreams), hostname, IntParam(port)) = args + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context and set the batch size val ssc = new StreamingContext(master, "CountRaw") - ssc.setBatchDuration(Seconds(1)) + ssc.setBatchDuration(Milliseconds(batchMillis)) // Make sure some tasks have started on each node ssc.sc.parallelize(1 to 1000, 1000).count() @@ -24,9 +24,9 @@ object CountRaw { ssc.sc.parallelize(1 to 1000, 1000).count() val rawStreams = (1 to numStreams).map(_ => - ssc.createRawNetworkStream[String](hostname, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnifiedDStream(rawStreams) - union.map(_.length).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) + union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) ssc.start() } }