Removing offset management code that is non-existent in kafka 0.7.0+
This commit is contained in:
parent
82b0cc90ca
commit
b61a4ec773
|
@ -173,13 +173,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
|
|||
stream.takeWhile { msgAndMetadata =>
|
||||
blockGenerator += msgAndMetadata.message
|
||||
|
||||
// Updating the offet. The key is (broker, topic, group, partition).
|
||||
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
|
||||
groupId, msgAndMetadata.topicInfo.partition.partId)
|
||||
val offset = msgAndMetadata.topicInfo.getConsumeOffset
|
||||
offsets.put(key, offset)
|
||||
// logInfo("Handled message: " + (key, offset).toString)
|
||||
|
||||
// Keep on handling messages
|
||||
true
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue