Make batch size configurable in RawCount

This commit is contained in:
root 2012-09-01 19:59:23 +00:00
parent 83dad56334
commit bf993cda63

View file

@ -7,16 +7,16 @@ import spark.streaming.StreamingContext._
object CountRaw {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: WordCountNetwork <master> <numStreams> <hostname> <port>")
if (args.length != 5) {
System.err.println("Usage: CountRaw <master> <numStreams> <host> <port> <batchMillis>")
System.exit(1)
}
val Array(master, IntParam(numStreams), hostname, IntParam(port)) = args
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context and set the batch size
val ssc = new StreamingContext(master, "CountRaw")
ssc.setBatchDuration(Seconds(1))
ssc.setBatchDuration(Milliseconds(batchMillis))
// Make sure some tasks have started on each node
ssc.sc.parallelize(1 to 1000, 1000).count()
@ -24,9 +24,9 @@ object CountRaw {
ssc.sc.parallelize(1 to 1000, 1000).count()
val rawStreams = (1 to numStreams).map(_ =>
ssc.createRawNetworkStream[String](hostname, port, StorageLevel.MEMORY_ONLY_2)).toArray
ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnifiedDStream(rawStreams)
union.map(_.length).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
ssc.start()
}
}