fixing memory leak in kafka MessageHandler
This commit is contained in:
parent
cfa8e769a8
commit
d069283211
|
@ -114,11 +114,8 @@ class KafkaReceiver(kafkaParams: Map[String, String],
|
|||
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
|
||||
def run() {
|
||||
logInfo("Starting MessageHandler.")
|
||||
stream.takeWhile { msgAndMetadata =>
|
||||
for (msgAndMetadata <- stream) {
|
||||
blockGenerator += msgAndMetadata.message
|
||||
// Keep on handling messages
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue