Merge branch 'dev' of github.com:radlab/spark into dev
This commit is contained in:
commit
a0b34d826a
|
@ -70,23 +70,32 @@ object WordCount2 {
|
|||
|
||||
def main (args: Array[String]) {
|
||||
|
||||
if (args.length < 2) {
|
||||
println ("Usage: SparkStreamContext <host> <file>")
|
||||
if (args.length != 5) {
|
||||
println ("Usage: SparkStreamContext <host> <file> <mapTasks> <reduceTasks> <batchMillis>")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
val Array(master, file, mapTasks, reduceTasks, batchMillis) = args
|
||||
|
||||
val BATCH_DURATION = Milliseconds(batchMillis.toLong)
|
||||
|
||||
val ssc = new SparkStreamContext(args(0), "WordCount2")
|
||||
ssc.setBatchDuration(Seconds(1))
|
||||
val ssc = new SparkStreamContext(master, "WordCount2")
|
||||
ssc.setBatchDuration(BATCH_DURATION)
|
||||
|
||||
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(StorageLevel.MEMORY_ONLY_DESER_2)
|
||||
println("Data count: " + data.count())
|
||||
println("Data count: " + data.count())
|
||||
println("Data count: " + data.count())
|
||||
|
||||
val sentences = new ConstantInputDStream(ssc, ssc.sc.textFile(args(1)).cache())
|
||||
val sentences = new ConstantInputDStream(ssc, data)
|
||||
ssc.inputStreams += sentences
|
||||
|
||||
import WordCount2_ExtraFunctions._
|
||||
|
||||
val windowedCounts = sentences
|
||||
.mapPartitions(splitAndCountPartitions)
|
||||
.reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10)
|
||||
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(10))
|
||||
.reduceByKeyAndWindow(add _, subtract _, Seconds(10), BATCH_DURATION, reduceTasks.toInt)
|
||||
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10))
|
||||
windowedCounts.print()
|
||||
|
||||
ssc.start()
|
||||
|
|
Loading…
Reference in a new issue