From 2bdbc87052389ff69404347fbc69457132dbcafd Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 12 Feb 2017 23:00:22 -0800 Subject: [PATCH] [SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group ## What changes were proposed in this pull request? In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new consumer. In our current implementation, the first consumer and the second consumer would be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers not being in the same group._** The cause is that, in our current implementation, the first consumer is created before `groupId` and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased during the creation of that first consumer, `groupId` and `nextId` would still be initialized to default values in the constructor for the second consumer. We should make sure that `groupId` and `nextId` are initialized before any consumer is created. ## How was this patch tested? Ran 100 times of `KafkaSourceSuite`; all passed Author: Liwei Lin Closes #16902 from lw-lin/SPARK-19564-. --- .../apache/spark/sql/kafka010/KafkaOffsetReader.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 6b2fb3c112..2696d6f089 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -64,6 +64,13 @@ private[kafka010] class KafkaOffsetReader( }) val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread) + /** + * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is + * created -- see SPARK-19564. + */ + private var groupId: String = null + private var nextId = 0 + /** * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the * offsets and never commits them. @@ -76,10 +83,6 @@ private[kafka010] class KafkaOffsetReader( private val offsetFetchAttemptIntervalMs = readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong - private var groupId: String = null - - private var nextId = 0 - private def nextGroupId(): String = { groupId = driverGroupIdPrefix + "-" + nextId nextId += 1