Various test programs

This commit is contained in:
root 2012-09-04 04:26:53 +00:00
parent ceabf71257
commit 1878731671
3 changed files with 143 additions and 2 deletions

View file

@ -0,0 +1,64 @@
package spark.streaming.examples
import spark.SparkContext
import SparkContext._
import spark.streaming._
import StreamingContext._
import spark.storage.StorageLevel
import scala.util.Sorting
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.collection.JavaConversions.mapAsScalaMap
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
object Grep2 {
def warmup(sc: SparkContext) {
(0 until 10).foreach {i =>
sc.parallelize(1 to 20000000, 1000)
.map(x => (x % 337, x % 1331))
.reduceByKey(_ + _)
.count()
}
}
def main (args: Array[String]) {
if (args.length != 6) {
println ("Usage: Grep2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
System.exit(1)
}
val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
val batchDuration = Milliseconds(batchMillis.toLong)
val ssc = new StreamingContext(master, "Grep2")
ssc.setBatchDuration(batchDuration)
//warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
println("Data count: " + data.count())
println("Data count: " + data.count())
println("Data count: " + data.count())
val sentences = new ConstantInputDStream(ssc, data)
ssc.inputStreams += sentences
sentences.filter(_.contains("Culpepper")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
while(true) { Thread.sleep(1000) }
}
}

View file

@ -22,6 +22,8 @@ object WordCount2_ExtraFunctions {
def subtract(v1: Long, v2: Long) = (v1 - v2)
def max(v1: Long, v2: Long) = math.max(v1, v2)
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
//val map = new java.util.HashMap[String, Long]
val map = new OLMap[String]
@ -85,7 +87,7 @@ object WordCount2 {
//warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, false, 2)) // Memory only, serialized, 2 replicas
new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
println("Data count: " + data.count())
println("Data count: " + data.count())
println("Data count: " + data.count())
@ -98,7 +100,9 @@ object WordCount2 {
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt)
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER,
StorageLevel.MEMORY_ONLY_DESER_2,
//new StorageLevel(false, true, true, 3),
Milliseconds(chkptMillis.toLong))
windowedCounts.foreachRDD(r => println("Element count: " + r.count()))

View file

@ -0,0 +1,73 @@
package spark.streaming.examples
import spark.SparkContext
import SparkContext._
import spark.streaming._
import StreamingContext._
import spark.storage.StorageLevel
import scala.util.Sorting
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.collection.JavaConversions.mapAsScalaMap
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
object WordMax2 {
def warmup(sc: SparkContext) {
(0 until 10).foreach {i =>
sc.parallelize(1 to 20000000, 1000)
.map(x => (x % 337, x % 1331))
.reduceByKey(_ + _)
.count()
}
}
def main (args: Array[String]) {
if (args.length != 6) {
println ("Usage: WordMax2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
System.exit(1)
}
val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
val batchDuration = Milliseconds(batchMillis.toLong)
val ssc = new StreamingContext(master, "WordMax2")
ssc.setBatchDuration(batchDuration)
//warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
println("Data count: " + data.count())
println("Data count: " + data.count())
println("Data count: " + data.count())
val sentences = new ConstantInputDStream(ssc, data)
ssc.inputStreams += sentences
import WordCount2_ExtraFunctions._
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
.reduceByKey(add _, reduceTasks.toInt)
.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
Milliseconds(chkptMillis.toLong))
.reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt)
//.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
// Milliseconds(chkptMillis.toLong))
windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
ssc.start()
while(true) { Thread.sleep(1000) }
}
}