From 2ef993d159939e9dedf909991ec5c5789bdd3670 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 2 Jan 2013 14:17:59 -0800 Subject: [PATCH] BufferingBlockCreator -> NetworkReceiver.BlockGenerator --- .../streaming/BufferingBlockCreator.scala | 80 ------------------- .../streaming/dstream/FlumeInputDStream.scala | 10 +-- .../streaming/dstream/KafkaInputDStream.scala | 10 +-- .../dstream/NetworkInputDStream.scala | 75 +++++++++++++++++ .../dstream/SocketInputDStream.scala | 8 +- 5 files changed, 89 insertions(+), 94 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala diff --git a/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala b/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala deleted file mode 100644 index efd2e75d40..0000000000 --- a/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala +++ /dev/null @@ -1,80 +0,0 @@ -package spark.streaming - -import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer -import spark.Logging -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - -/** - * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into - * appropriately named blocks at regular intervals. This class starts two threads, - * one to periodically start a new batch and prepare the previous batch of as a block, - * the other to push the blocks into the block manager. - */ -class BufferingBlockCreator[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) - extends Serializable with Logging { - - case class Block(id: String, iterator: Iterator[T], metadata: Any = null) - - val clock = new SystemClock() - val blockInterval = 200L - val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) - val blockStorageLevel = storageLevel - val blocksForPushing = new ArrayBlockingQueue[Block](1000) - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - var currentBuffer = new ArrayBuffer[T] - - def start() { - blockIntervalTimer.start() - blockPushingThread.start() - logInfo("Data handler started") - } - - def stop() { - blockIntervalTimer.stop() - blockPushingThread.interrupt() - logInfo("Data handler stopped") - } - - def += (obj: T) { - currentBuffer += obj - } - - private def createBlock(blockId: String, iterator: Iterator[T]) : Block = { - new Block(blockId, iterator) - } - - private def updateCurrentBuffer(time: Long) { - try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[T] - if (newBlockBuffer.size > 0) { - val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) - val newBlock = createBlock(blockId, newBlockBuffer.toIterator) - blocksForPushing.add(newBlock) - } - } catch { - case ie: InterruptedException => - logInfo("Block interval timer thread interrupted") - case e: Exception => - receiver.stop() - } - } - - private def keepPushingBlocks() { - logInfo("Block pushing thread started") - try { - while(true) { - val block = blocksForPushing.take() - receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) - } - } catch { - case ie: InterruptedException => - logInfo("Block pushing thread interrupted") - case e: Exception => - receiver.stop() - } - } -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index a6fa378d6e..ca70e72e56 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -97,13 +97,13 @@ private[streaming] object SparkFlumeEvent { private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) Status.OK } override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { events.foreach (event => - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) Status.OK } } @@ -118,19 +118,19 @@ class FlumeReceiver( storageLevel: StorageLevel ) extends NetworkReceiver[SparkFlumeEvent](streamId) { - lazy val dataHandler = new BufferingBlockCreator(this, storageLevel) + lazy val blockGenerator = new BlockGenerator(storageLevel) protected override def onStart() { val responder = new SpecificResponder( classOf[AvroSourceProtocol], new FlumeEventServer(this)); val server = new NettyServer(responder, new InetSocketAddress(host, port)); - dataHandler.start() + blockGenerator.start() server.start() logInfo("Flume receiver started") } protected override def onStop() { - dataHandler.stop() + blockGenerator.stop() logInfo("Flume receiver stopped") } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index b1941fb427..25988a2ce7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -110,19 +110,19 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, val ZK_TIMEOUT = 10000 // Handles pushing data into the BlockManager - lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel) + lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset lazy val offsets = HashMap[KafkaPartitionKey, Long]() // Connection to Kafka var consumerConnector : ZookeeperConsumerConnector = null def onStop() { - dataHandler.stop() + blockGenerator.stop() } def onStart() { - dataHandler.start() + blockGenerator.start() // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) @@ -170,8 +170,8 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, private class MessageHandler(stream: KafkaStream[String]) extends Runnable { def run() { logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => - dataHandler += msgAndMetadata.message + stream.takeWhile { msgAndMetadata => + blockGenerator += msgAndMetadata.message // Updating the offet. The key is (broker, topic, group, partition). val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 41276da8bb..18e62a0e33 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -14,6 +14,8 @@ import akka.actor.{Props, Actor} import akka.pattern.ask import akka.dispatch.Await import akka.util.duration._ +import spark.streaming.util.{RecurringTimer, SystemClock} +import java.util.concurrent.ArrayBlockingQueue abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { @@ -154,4 +156,77 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri tracker ! DeregisterReceiver(streamId, msg) } } + + /** + * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into + * appropriately named blocks at regular intervals. This class starts two threads, + * one to periodically start a new batch and prepare the previous batch of as a block, + * the other to push the blocks into the block manager. + */ + class BlockGenerator(storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = 200L + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + private def createBlock(blockId: String, iterator: Iterator[T]) : Block = { + new Block(blockId, iterator) + } + + private def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval) + val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + + private def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 8374f131d6..8e4b20ea4c 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -29,7 +29,7 @@ class SocketReceiver[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkReceiver[T](streamId) { - lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel) + lazy protected val blockGenerator = new BlockGenerator(storageLevel) override def getLocationPreference = None @@ -37,16 +37,16 @@ class SocketReceiver[T: ClassManifest]( logInfo("Connecting to " + host + ":" + port) val socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) - dataHandler.start() + blockGenerator.start() val iterator = bytesToObjects(socket.getInputStream()) while(iterator.hasNext) { val obj = iterator.next - dataHandler += obj + blockGenerator += obj } } protected def onStop() { - dataHandler.stop() + blockGenerator.stop() } }