Merge pull request #372 from Reinvigorate/sm-kafka

Removing offset management code that is non-existent in kafka 0.7.0+
This commit is contained in:
Tathagata Das 2013-02-07 12:41:07 -08:00
commit 12300758cc
4 changed files with 24 additions and 99 deletions

View file

@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 6) {
System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(master, hostname, port, group, topics, numThreads) = args
val Array(master, zkQuorum, group, topics, numThreads) = args
val sc = new SparkContext(master, "KafkaWordCount")
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
@ -38,16 +38,16 @@ object KafkaWordCount {
object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
if (args.length < 2) {
System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", hostname + ":" + port)
props.put("zk.connect", zkQuorum)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)

3
run
View file

@ -84,6 +84,9 @@ CLASSPATH+=":$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find "$STREAMING_DIR/lib" -name '*jar'`; do
CLASSPATH+=":$jar"
done
if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/jars/*"
CLASSPATH+=":$FWDIR/lib_managed/bundles/*"

View file

@ -136,8 +136,7 @@ class StreamingContext private (
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@ -146,14 +145,13 @@ class StreamingContext private (
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def kafkaStream[T: ClassManifest](
hostname: String,
port: Int,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
registerInputStream(inputStream)
inputStream
}

View file

@ -19,21 +19,11 @@ import scala.collection.JavaConversions._
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
private[streaming]
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages from a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@ -44,65 +34,21 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
// Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
/* NOT USED - Originally intended for fault-tolerance
// In case of a failure, the offets for a particular timestamp will be restored.
@transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
case x : KafkaInputDStreamMetadata =>
savedOffsets(x.timestamp) = x.data
// TOOD: Remove logging
logInfo("New saved Offsets: " + savedOffsets)
case _ => logInfo("Received unknown metadata: " + metadata.toString)
}
}
override protected[streaming] def updateCheckpointData(currentTime: Time) {
super.updateCheckpointData(currentTime)
if(savedOffsets.size > 0) {
// Find the offets that were stored before the checkpoint was initiated
val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last
val latestOffsets = savedOffsets(key)
logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
// TODO: This may throw out offsets that are created after the checkpoint,
// but it's unlikely we'll need them.
savedOffsets.clear()
}
}
override protected[streaming] def restoreCheckpointData() {
super.restoreCheckpointData()
logInfo("Restoring KafkaDStream checkpoint data.")
checkpointData match {
case x : KafkaDStreamCheckpointData =>
restoredOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
} */
def createReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
class KafkaReceiver(host: String, port: Int, groupId: String,
class KafkaReceiver(zkQuorum: String, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
@ -111,8 +57,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
var consumerConnector : ZookeeperConsumerConnector = null
@ -127,24 +71,23 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
val zooKeeperEndPoint = host + ":" + port
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zooKeeperEndPoint)
props.put("zk.connect", zkQuorum)
props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
props.put("groupid", groupId)
// Create the connection to the cluster
logInfo("Connecting to Zookeper: " + zooKeeperEndPoint)
logInfo("Connecting to Zookeper: " + zkQuorum)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
logInfo("Connected to " + zooKeeperEndPoint)
logInfo("Connected to " + zkQuorum)
// Reset the Kafka offsets in case we are recovering from a failure
resetOffsets(initialOffsets)
// If specified, set the topic offset
setOffsets(initialOffsets)
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
@ -157,7 +100,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
}
// Overwrites the offets in Zookeper.
private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) {
private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
offsets.foreach { case(key, offset) =>
val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
val partitionName = key.brokerId + "-" + key.partId
@ -173,28 +116,9 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
stream.takeWhile { msgAndMetadata =>
blockGenerator += msgAndMetadata.message
// Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
// logInfo("Handled message: " + (key, offset).toString)
// Keep on handling messages
true
}
}
}
// NOT USED - Originally intended for fault-tolerance
// class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
// extends BufferingBlockCreator[Any](receiver, storageLevel) {
// override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
// // Creates a new Block with Kafka-specific Metadata
// new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
// }
// }
}