Further fixes to raw text sender, plus an app that uses it

This commit is contained in:
root 2012-09-01 19:45:25 +00:00
parent f84d2bbe55
commit 83dad56334
7 changed files with 56 additions and 5 deletions

View file

@ -176,7 +176,7 @@ extends Logging with Serializable {
def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) =
new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2)
def reduce(reduceFunc: (T, T) => T) = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
def count() = this.map(_ => 1).reduce(_ + _)

View file

@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
logInfo("Total delay: %.4f s for job %s; execution was %.4f s".format(
logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>

View file

@ -22,7 +22,8 @@ import spark.storage.StorageLevel
class RawInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
host: String,
port: Int)
port: Int,
storageLevel: StorageLevel)
extends NetworkInputDStream[T](ssc) with Logging {
val streamId = id
@ -49,7 +50,7 @@ class RawInputDStream[T: ClassManifest](
val buffer = queue.take()
val blockId = "input-" + streamId + "-" + nextBlockNumber
nextBlockNumber += 1
env.blockManager.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_2)
env.blockManager.putBytes(blockId, buffer, storageLevel)
actor ! BlockPublished(blockId)
}
}

View file

@ -4,6 +4,7 @@ import spark.RDD
import spark.Logging
import spark.SparkEnv
import spark.SparkContext
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue
@ -64,6 +65,16 @@ class StreamingContext (
inputStreams += inputStream
inputStream
}
def createRawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
inputStreams += inputStream
inputStream
}
/*
def createHttpTextStream(url: String): DStream[String] = {

View file

@ -0,0 +1,32 @@
package spark.streaming.examples
import spark.util.IntParam
import spark.storage.StorageLevel
import spark.streaming._
import spark.streaming.StreamingContext._
object CountRaw {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: WordCountNetwork <master> <numStreams> <hostname> <port>")
System.exit(1)
}
val Array(master, IntParam(numStreams), hostname, IntParam(port)) = args
// Create the context and set the batch size
val ssc = new StreamingContext(master, "CountRaw")
ssc.setBatchDuration(Seconds(1))
// Make sure some tasks have started on each node
ssc.sc.parallelize(1 to 1000, 1000).count()
ssc.sc.parallelize(1 to 1000, 1000).count()
ssc.sc.parallelize(1 to 1000, 1000).count()
val rawStreams = (1 to numStreams).map(_ =>
ssc.createRawNetworkStream[String](hostname, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnifiedDStream(rawStreams)
union.map(_.length).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
ssc.start()
}
}

View file

@ -85,7 +85,7 @@ object WordCount2 {
//warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, true, 2)) // Memory only, deserialized, 2 replicas
new StorageLevel(false, true, false, 2)) // Memory only, serialized, 2 replicas
println("Data count: " + data.count())
println("Data count: " + data.count())
println("Data count: " + data.count())

View file

@ -1,5 +1,6 @@
package spark.streaming.util
import java.nio.ByteBuffer
import spark.util.{RateLimitedOutputStream, IntParam}
import java.net.ServerSocket
import spark.{Logging, KryoSerializer}
@ -33,7 +34,12 @@ object RawTextSender extends Logging {
bufferStream.trim()
val array = bufferStream.array
val countBuf = ByteBuffer.wrap(new Array[Byte](4))
countBuf.putInt(array.length)
countBuf.flip()
val serverSocket = new ServerSocket(port)
logInfo("Listening on port " + port)
while (true) {
val socket = serverSocket.accept()
@ -41,6 +47,7 @@ object RawTextSender extends Logging {
val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec)
try {
while (true) {
out.write(countBuf.array)
out.write(array)
}
} catch {