[SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming

## What changes were proposed in this pull request?

Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.

## How was this patch tested?
Existing unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21134 from tdas/SPARK-24056.
This commit is contained in:
Tathagata Das 2018-04-24 14:33:33 -07:00
parent 379bffa052
commit 7b1e6523af

View file

@ -75,7 +75,17 @@ private[kafka010] class KafkaOffsetReader(
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
* offsets and never commits them.
*/
protected var consumer = createConsumer()
@volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer == null) {
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
_consumer = consumerStrategy.createConsumer(newKafkaParams)
}
_consumer
}
private val maxOffsetFetchAttempts =
readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
@ -95,9 +105,7 @@ private[kafka010] class KafkaOffsetReader(
* Closes the connection to Kafka, and cleans up state.
*/
def close(): Unit = {
runUninterruptibly {
consumer.close()
}
if (_consumer != null) runUninterruptibly { stopConsumer() }
kafkaReaderThread.shutdown()
}
@ -304,19 +312,14 @@ private[kafka010] class KafkaOffsetReader(
}
}
/**
* Create a consumer using the new generated group id. We always use a new consumer to avoid
* just using a broken consumer to retry on Kafka errors, which likely will fail again.
*/
private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = synchronized {
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
consumerStrategy.createConsumer(newKafkaParams)
private def stopConsumer(): Unit = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer != null) _consumer.close()
}
private def resetConsumer(): Unit = synchronized {
consumer.close()
consumer = createConsumer()
stopConsumer()
_consumer = null // will automatically get reinitialized again
}
}