BufferingBlockCreator -> NetworkReceiver.BlockGenerator

This commit is contained in:
Patrick Wendell 2013-01-02 14:17:59 -08:00
parent 96a6ff0b09
commit 2ef993d159
5 changed files with 89 additions and 94 deletions

View file

@ -1,80 +0,0 @@
package spark.streaming
import java.util.concurrent.ArrayBlockingQueue
import scala.collection.mutable.ArrayBuffer
import spark.Logging
import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
/**
* Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
* appropriately named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
class BufferingBlockCreator[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel)
extends Serializable with Logging {
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = 200L
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
var currentBuffer = new ArrayBuffer[T]
def start() {
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Data handler started")
}
def stop() {
blockIntervalTimer.stop()
blockPushingThread.interrupt()
logInfo("Data handler stopped")
}
def += (obj: T) {
currentBuffer += obj
}
private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
new Block(blockId, iterator)
}
private def updateCurrentBuffer(time: Long) {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval)
val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
blocksForPushing.add(newBlock)
}
} catch {
case ie: InterruptedException =>
logInfo("Block interval timer thread interrupted")
case e: Exception =>
receiver.stop()
}
}
private def keepPushingBlocks() {
logInfo("Block pushing thread started")
try {
while(true) {
val block = blocksForPushing.take()
receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
}
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread interrupted")
case e: Exception =>
receiver.stop()
}
}
}

View file

@ -97,13 +97,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming] private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = { override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
Status.OK Status.OK
} }
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event => events.foreach (event =>
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK Status.OK
} }
} }
@ -118,19 +118,19 @@ class FlumeReceiver(
storageLevel: StorageLevel storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) { ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new BufferingBlockCreator(this, storageLevel) lazy val blockGenerator = new BlockGenerator(storageLevel)
protected override def onStart() { protected override def onStart() {
val responder = new SpecificResponder( val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this)); classOf[AvroSourceProtocol], new FlumeEventServer(this));
val server = new NettyServer(responder, new InetSocketAddress(host, port)); val server = new NettyServer(responder, new InetSocketAddress(host, port));
dataHandler.start() blockGenerator.start()
server.start() server.start()
logInfo("Flume receiver started") logInfo("Flume receiver started")
} }
protected override def onStop() { protected override def onStop() {
dataHandler.stop() blockGenerator.stop()
logInfo("Flume receiver stopped") logInfo("Flume receiver stopped")
} }

View file

@ -110,19 +110,19 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val ZK_TIMEOUT = 10000 val ZK_TIMEOUT = 10000
// Handles pushing data into the BlockManager // Handles pushing data into the BlockManager
lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel) lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]() lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka // Connection to Kafka
var consumerConnector : ZookeeperConsumerConnector = null var consumerConnector : ZookeeperConsumerConnector = null
def onStop() { def onStop() {
dataHandler.stop() blockGenerator.stop()
} }
def onStart() { def onStart() {
dataHandler.start() blockGenerator.start()
// In case we are using multiple Threads to handle Kafka Messages // In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
@ -170,8 +170,8 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
private class MessageHandler(stream: KafkaStream[String]) extends Runnable { private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() { def run() {
logInfo("Starting MessageHandler.") logInfo("Starting MessageHandler.")
stream.takeWhile { msgAndMetadata => stream.takeWhile { msgAndMetadata =>
dataHandler += msgAndMetadata.message blockGenerator += msgAndMetadata.message
// Updating the offet. The key is (broker, topic, group, partition). // Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,

View file

@ -14,6 +14,8 @@ import akka.actor.{Props, Actor}
import akka.pattern.ask import akka.pattern.ask
import akka.dispatch.Await import akka.dispatch.Await
import akka.util.duration._ import akka.util.duration._
import spark.streaming.util.{RecurringTimer, SystemClock}
import java.util.concurrent.ArrayBlockingQueue
abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) { extends InputDStream[T](ssc_) {
@ -154,4 +156,77 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
tracker ! DeregisterReceiver(streamId, msg) tracker ! DeregisterReceiver(streamId, msg)
} }
} }
/**
* Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
* appropriately named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = 200L
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
var currentBuffer = new ArrayBuffer[T]
def start() {
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Data handler started")
}
def stop() {
blockIntervalTimer.stop()
blockPushingThread.interrupt()
logInfo("Data handler stopped")
}
def += (obj: T) {
currentBuffer += obj
}
private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
new Block(blockId, iterator)
}
private def updateCurrentBuffer(time: Long) {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval)
val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
blocksForPushing.add(newBlock)
}
} catch {
case ie: InterruptedException =>
logInfo("Block interval timer thread interrupted")
case e: Exception =>
NetworkReceiver.this.stop()
}
}
private def keepPushingBlocks() {
logInfo("Block pushing thread started")
try {
while(true) {
val block = blocksForPushing.take()
NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
}
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread interrupted")
case e: Exception =>
NetworkReceiver.this.stop()
}
}
}
} }

View file

@ -29,7 +29,7 @@ class SocketReceiver[T: ClassManifest](
storageLevel: StorageLevel storageLevel: StorageLevel
) extends NetworkReceiver[T](streamId) { ) extends NetworkReceiver[T](streamId) {
lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel) lazy protected val blockGenerator = new BlockGenerator(storageLevel)
override def getLocationPreference = None override def getLocationPreference = None
@ -37,16 +37,16 @@ class SocketReceiver[T: ClassManifest](
logInfo("Connecting to " + host + ":" + port) logInfo("Connecting to " + host + ":" + port)
val socket = new Socket(host, port) val socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port) logInfo("Connected to " + host + ":" + port)
dataHandler.start() blockGenerator.start()
val iterator = bytesToObjects(socket.getInputStream()) val iterator = bytesToObjects(socket.getInputStream())
while(iterator.hasNext) { while(iterator.hasNext) {
val obj = iterator.next val obj = iterator.next
dataHandler += obj blockGenerator += obj
} }
} }
protected def onStop() { protected def onStop() {
dataHandler.stop() blockGenerator.stop()
} }
} }