From d006109e9504b3221de3a15f9bfee96dafa8b593 Mon Sep 17 00:00:00 2001 From: Denny Date: Sun, 11 Nov 2012 11:06:49 -0800 Subject: [PATCH] Kafka Stream comments. --- .../main/scala/spark/streaming/DStream.scala | 7 +- .../spark/streaming/StreamingContext.scala | 12 +++ .../streaming/examples/KafkaWordCount.scala | 44 ++++++----- .../streaming/input/KafkaInputDStream.scala | 77 +++++++++++++------ .../spark/streaming/CheckpointSuite.scala | 12 +-- 5 files changed, 99 insertions(+), 53 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3219919a24..b8324d11a3 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -17,6 +17,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration + case class DStreamCheckpointData(rdds: HashMap[Time, Any]) abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext) @@ -61,7 +62,7 @@ extends Serializable with Logging { // Checkpoint details protected[streaming] val mustCheckpoint = false protected[streaming] var checkpointInterval: Time = null - protected[streaming] var checkpointData = DStreamCheckpointData(HashMap[Time, Any]()) + protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]()) // Reference to whole DStream graph protected[streaming] var graph: DStreamGraph = null @@ -286,7 +287,9 @@ extends Serializable with Logging { * This methd should be overwritten by sublcasses of InputDStream. */ protected[streaming] def addMetadata(metadata: Any) { - logInfo("Dropping Metadata: " + metadata.toString) + if (metadata != null) { + logInfo("Dropping Metadata: " + metadata.toString) + } } /** diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index d68d2632e7..e87d0cb7c8 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -102,6 +102,18 @@ final class StreamingContext ( private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() + /** + * Create an input stream that pulls messages form a Kafka Broker. + * + * @param host Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ def kafkaStream[T: ClassManifest]( hostname: String, port: Int, diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index 655f9627b3..1e92cbb210 100644 --- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -3,34 +3,38 @@ package spark.streaming.examples import spark.streaming._ import spark.streaming.StreamingContext._ import spark.storage.StorageLevel +import WordCount2_ExtraFunctions._ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: WordCountNetwork ") + + if (args.length < 4) { + System.err.println("Usage: KafkaWordCount ") System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork") - ssc.setBatchDuration(Seconds(2)) + val ssc = args(3) match { + // Restore the stream from a checkpoint + case "true" => + new StreamingContext("work/checkpoint") + case _ => + val tmp = new StreamingContext(args(0), "KafkaWordCount") - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') - ssc.checkpoint("checkpoint", Time(1000 * 5)) - val lines = ssc.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1), - Map(KafkaPartitionKey(0, "test", "test_group", 0) -> 2382)) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() + tmp.setBatchDuration(Seconds(2)) + tmp.checkpoint("work/checkpoint", Seconds(10)) + + val lines = tmp.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1), + Map(KafkaPartitionKey(0,"test","test_group",0) -> 0l)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + + wordCounts.persist().checkpoint(Seconds(10)) + wordCounts.print() + + tmp + } ssc.start() - // Wait for 12 seconds - Thread.sleep(12000) - ssc.stop() - - val newSsc = new StreamingContext("checkpoint") - newSsc.start() - } } + diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala index 814f2706d6..ad8e86a094 100644 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala @@ -1,15 +1,11 @@ package spark.streaming -import java.lang.reflect.Method -import java.nio.ByteBuffer import java.util.Properties -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, Executors} -import kafka.api.{FetchRequest} +import java.util.concurrent.Executors import kafka.consumer._ -import kafka.cluster.Partition import kafka.message.{Message, MessageSet, MessageAndMetadata} import kafka.serializer.StringDecoder -import kafka.utils.{Pool, Utils, ZKGroupTopicDirs} +import kafka.utils.{Utils, ZKGroupTopicDirs} import kafka.utils.ZkUtils._ import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ @@ -17,14 +13,25 @@ import spark._ import spark.RDD import spark.storage.StorageLevel - +// Key for a specific Kafka Partition: (broker, topic, group, part) case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) +// Metadata for a Kafka Stream that it sent to the Master case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) +// Checkpoint data specific to a KafkaInputDstream case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: HashMap[Long, Map[KafkaPartitionKey, Long]]) extends DStreamCheckpointData(kafkaRdds) + savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** * Input stream that pulls messages form a Kafka Broker. + * + * @param host Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. */ class KafkaInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, @@ -36,21 +43,31 @@ class KafkaInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { + // Metadata that keeps track of which messages have already been consumed. var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() + // In case of a failure, the offets for a particular timestamp will be restored. + @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null override protected[streaming] def addMetadata(metadata: Any) { metadata match { - case x : KafkaInputDStreamMetadata => + case x : KafkaInputDStreamMetadata => savedOffsets(x.timestamp) = x.data - logInfo("Saved Offsets: " + savedOffsets) + // TOOD: Remove logging + logInfo("New saved Offsets: " + savedOffsets) case _ => logInfo("Received unknown metadata: " + metadata.toString) } } override protected[streaming] def updateCheckpointData(currentTime: Time) { super.updateCheckpointData(currentTime) - logInfo("Updating KafkaDStream checkpoint data: " + savedOffsets.toString) - checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, savedOffsets) + if(savedOffsets.size > 0) { + // Find the offets that were stored before the checkpoint was initiated + val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last + val latestOffsets = savedOffsets(key) + logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) + checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) + savedOffsets.clear() + } } override protected[streaming] def restoreCheckpointData() { @@ -58,14 +75,21 @@ class KafkaInputDStream[T: ClassManifest]( logInfo("Restoring KafkaDStream checkpoint data.") checkpointData match { case x : KafkaDStreamCheckpointData => - savedOffsets = x.savedOffsets - logInfo("Restored KafkaDStream offsets: " + savedOffsets.toString) + restoredOffsets = x.savedOffsets + logInfo("Restored KafkaDStream offsets: " + savedOffsets) } } def createReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) - .asInstanceOf[NetworkReceiver[T]] + // We have restored from a checkpoint, use the restored offsets + if (restoredOffsets != null) { + new KafkaReceiver(id, host, port, groupId, topics, restoredOffsets, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } else { + new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } + } } @@ -96,27 +120,28 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) val zooKeeperEndPoint = host + ":" + port - logInfo("Starting Kafka Consumer Stream in group " + groupId) + logInfo("Starting Kafka Consumer Stream with group: " + groupId) logInfo("Initial offsets: " + initialOffsets.toString) - logInfo("Connecting to " + zooKeeperEndPoint) - // Specify some Consumer properties + + // Zookeper connection properties val props = new Properties() props.put("zk.connect", zooKeeperEndPoint) props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) props.put("groupid", groupId) // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] + logInfo("Connected to " + zooKeeperEndPoint) // Reset the Kafka offsets in case we are recovering from a failure resetOffsets(initialOffsets) - - logInfo("Connected to " + zooKeeperEndPoint) // Create Threads for each Topic/Message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) + // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } @@ -133,19 +158,20 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, } } - // Responsible for handling Kafka Messages - class MessageHandler(stream: KafkaStream[String]) extends Runnable { + // Handles Kafka Messages + private class MessageHandler(stream: KafkaStream[String]) extends Runnable { def run() { logInfo("Starting MessageHandler.") stream.takeWhile { msgAndMetadata => dataHandler += msgAndMetadata.message - // Updating the offet. The key is (topic, partitionID). + // Updating the offet. The key is (broker, topic, group, partition). val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, groupId, msgAndMetadata.topicInfo.partition.partId) val offset = msgAndMetadata.topicInfo.getConsumeOffset offsets.put(key, offset) - logInfo((key, offset).toString) + // TODO: Remove Logging + logInfo("Handled message: " + (key, offset).toString) // Keep on handling messages true @@ -157,6 +183,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, extends DataHandler[Any](receiver, storageLevel) { override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { + // Creates a new Block with Kafka-specific Metadata new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) } diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 038827ddb0..0450120061 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -59,9 +59,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // then check whether some RDD has been checkpointed or not ssc.start() runStreamsWithRealDelay(ssc, firstNumBatches) - logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.mkString(",\n") + "]") - assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before first failure") - stateStream.checkpointData.foreach { + logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]") + assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure") + stateStream.checkpointData.rdds.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") @@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted - val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString)) + val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString)) runStreamsWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -87,8 +87,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // is present in the checkpoint data or not ssc.start() runStreamsWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure") - stateStream.checkpointData.foreach { + assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure") + stateStream.checkpointData.rdds.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(),