Kafka Stream comments.

This commit is contained in:
Denny 2012-11-11 11:06:49 -08:00
parent 2e8f2ee4ad
commit d006109e95
5 changed files with 99 additions and 53 deletions

View file

@ -17,6 +17,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
case class DStreamCheckpointData(rdds: HashMap[Time, Any])
abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
@ -61,7 +62,7 @@ extends Serializable with Logging {
// Checkpoint details
protected[streaming] val mustCheckpoint = false
protected[streaming] var checkpointInterval: Time = null
protected[streaming] var checkpointData = DStreamCheckpointData(HashMap[Time, Any]())
protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
@ -286,7 +287,9 @@ extends Serializable with Logging {
* This methd should be overwritten by sublcasses of InputDStream.
*/
protected[streaming] def addMetadata(metadata: Any) {
logInfo("Dropping Metadata: " + metadata.toString)
if (metadata != null) {
logInfo("Dropping Metadata: " + metadata.toString)
}
}
/**

View file

@ -102,6 +102,18 @@ final class StreamingContext (
private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
* Create an input stream that pulls messages form a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def kafkaStream[T: ClassManifest](
hostname: String,
port: Int,

View file

@ -3,34 +3,38 @@ package spark.streaming.examples
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
import WordCount2_ExtraFunctions._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <restore>")
System.exit(1)
}
// Create the context and set the batch size
val ssc = new StreamingContext(args(0), "WordCountNetwork")
ssc.setBatchDuration(Seconds(2))
val ssc = args(3) match {
// Restore the stream from a checkpoint
case "true" =>
new StreamingContext("work/checkpoint")
case _ =>
val tmp = new StreamingContext(args(0), "KafkaWordCount")
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
ssc.checkpoint("checkpoint", Time(1000 * 5))
val lines = ssc.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1),
Map(KafkaPartitionKey(0, "test", "test_group", 0) -> 2382))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
tmp.setBatchDuration(Seconds(2))
tmp.checkpoint("work/checkpoint", Seconds(10))
val lines = tmp.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1),
Map(KafkaPartitionKey(0,"test","test_group",0) -> 0l))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.persist().checkpoint(Seconds(10))
wordCounts.print()
tmp
}
ssc.start()
// Wait for 12 seconds
Thread.sleep(12000)
ssc.stop()
val newSsc = new StreamingContext("checkpoint")
newSsc.start()
}
}

View file

@ -1,15 +1,11 @@
package spark.streaming
import java.lang.reflect.Method
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, Executors}
import kafka.api.{FetchRequest}
import java.util.concurrent.Executors
import kafka.consumer._
import kafka.cluster.Partition
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Pool, Utils, ZKGroupTopicDirs}
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
@ -17,14 +13,25 @@ import spark._
import spark.RDD
import spark.storage.StorageLevel
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// Metadata for a Kafka Stream that it sent to the Master
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// Checkpoint data specific to a KafkaInputDstream
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: HashMap[Long, Map[KafkaPartitionKey, Long]]) extends DStreamCheckpointData(kafkaRdds)
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages form a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
@ -36,21 +43,31 @@ class KafkaInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
// Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
// In case of a failure, the offets for a particular timestamp will be restored.
@transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
case x : KafkaInputDStreamMetadata =>
case x : KafkaInputDStreamMetadata =>
savedOffsets(x.timestamp) = x.data
logInfo("Saved Offsets: " + savedOffsets)
// TOOD: Remove logging
logInfo("New saved Offsets: " + savedOffsets)
case _ => logInfo("Received unknown metadata: " + metadata.toString)
}
}
override protected[streaming] def updateCheckpointData(currentTime: Time) {
super.updateCheckpointData(currentTime)
logInfo("Updating KafkaDStream checkpoint data: " + savedOffsets.toString)
checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, savedOffsets)
if(savedOffsets.size > 0) {
// Find the offets that were stored before the checkpoint was initiated
val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last
val latestOffsets = savedOffsets(key)
logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
savedOffsets.clear()
}
}
override protected[streaming] def restoreCheckpointData() {
@ -58,14 +75,21 @@ class KafkaInputDStream[T: ClassManifest](
logInfo("Restoring KafkaDStream checkpoint data.")
checkpointData match {
case x : KafkaDStreamCheckpointData =>
savedOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets.toString)
restoredOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
}
def createReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
// We have restored from a checkpoint, use the restored offsets
if (restoredOffsets != null) {
new KafkaReceiver(id, host, port, groupId, topics, restoredOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
} else {
new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
}
@ -96,27 +120,28 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
val zooKeeperEndPoint = host + ":" + port
logInfo("Starting Kafka Consumer Stream in group " + groupId)
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
logInfo("Connecting to " + zooKeeperEndPoint)
// Specify some Consumer properties
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zooKeeperEndPoint)
props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
props.put("groupid", groupId)
// Create the connection to the cluster
logInfo("Connecting to Zookeper: " + zooKeeperEndPoint)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
logInfo("Connected to " + zooKeeperEndPoint)
// Reset the Kafka offsets in case we are recovering from a failure
resetOffsets(initialOffsets)
logInfo("Connected to " + zooKeeperEndPoint)
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
@ -133,19 +158,20 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
}
}
// Responsible for handling Kafka Messages
class MessageHandler(stream: KafkaStream[String]) extends Runnable {
// Handles Kafka Messages
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
stream.takeWhile { msgAndMetadata =>
dataHandler += msgAndMetadata.message
// Updating the offet. The key is (topic, partitionID).
// Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
logInfo((key, offset).toString)
// TODO: Remove Logging
logInfo("Handled message: " + (key, offset).toString)
// Keep on handling messages
true
@ -157,6 +183,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
extends DataHandler[Any](receiver, storageLevel) {
override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
// Creates a new Block with Kafka-specific Metadata
new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
}

View file

@ -59,9 +59,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// then check whether some RDD has been checkpointed or not
ssc.start()
runStreamsWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.mkString(",\n") + "]")
assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.foreach {
logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.rdds.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString))
val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
runStreamsWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@ -87,8 +87,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// is present in the checkpoint data or not
ssc.start()
runStreamsWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.foreach {
assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.rdds.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),