[SPARK-30336][SQL][SS] Move Kafka consumer-related classes to its own package

### What changes were proposed in this pull request?

There're too many classes placed in a single package "org.apache.spark.sql.kafka010" which classes can be grouped by purpose.

As a part of change in SPARK-21869 (#26845), we moved out producer related classes to "org.apache.spark.sql.kafka010.producer" and only expose necessary classes/methods to the outside of package. This patch applies the same to consumer related classes.

### Why are the changes needed?

Described above.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing UTs.

Closes #26991 from HeartSaVioR/SPARK-30336.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-12-31 09:30:55 -06:00 committed by Sean Owen
parent 23a49aff27
commit 319ccd5711
9 changed files with 37 additions and 32 deletions

View file

@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */ /** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
private[kafka010] case class KafkaBatchInputPartition( private[kafka010] case class KafkaBatchInputPartition(

View file

@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset} import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.CaseInsensitiveStringMap
/** /**

View file

@ -19,18 +19,15 @@ package org.apache.spark.sql.kafka010
import java.{util => ju} import java.{util => ju}
import scala.collection.mutable.ArrayBuffer import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator import org.apache.spark.util.NextIterator
/** Offset range that one partition of the KafkaSourceRDD has to read */ /** Offset range that one partition of the KafkaSourceRDD has to read */
private[kafka010] case class KafkaSourceRDDOffsetRange( private[kafka010] case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition, topicPartition: TopicPartition,

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.sql.kafka010 package org.apache.spark.sql.kafka010.consumer
import java.{util => ju} import java.{util => ju}
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/** /**
@ -39,7 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* modified in same instance, this class cannot be replaced with general pool implementations * modified in same instance, this class cannot be replaced with general pool implementations
* including Apache Commons Pool which pools KafkaConsumer. * including Apache Commons Pool which pools KafkaConsumer.
*/ */
private[kafka010] class FetchedDataPool( private[consumer] class FetchedDataPool(
executorService: ScheduledExecutorService, executorService: ScheduledExecutorService,
clock: Clock, clock: Clock,
conf: SparkConf) extends Logging { conf: SparkConf) extends Logging {
@ -159,8 +160,8 @@ private[kafka010] class FetchedDataPool(
} }
} }
private[kafka010] object FetchedDataPool { private[consumer] object FetchedDataPool {
private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { private[consumer] case class CachedFetchedData(fetchedData: FetchedData) {
var lastReleasedTimestamp: Long = Long.MaxValue var lastReleasedTimestamp: Long = Long.MaxValue
var lastAcquiredTimestamp: Long = Long.MinValue var lastAcquiredTimestamp: Long = Long.MinValue
var inUse: Boolean = false var inUse: Boolean = false
@ -179,5 +180,5 @@ private[kafka010] object FetchedDataPool {
} }
} }
private[kafka010] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] private[consumer] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData]
} }

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.sql.kafka010 package org.apache.spark.sql.kafka010.consumer
import java.{util => ju} import java.{util => ju}
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -25,8 +25,9 @@ import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ import org.apache.spark.sql.kafka010._
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
/** /**
* Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]].
@ -45,10 +46,9 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
* not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]]
* unless caller shares the object to multiple threads. * unless caller shares the object to multiple threads.
*/ */
private[kafka010] class InternalKafkaConsumerPool( private[consumer] class InternalKafkaConsumerPool(
objectFactory: ObjectFactory, objectFactory: ObjectFactory,
poolConfig: PoolConfig) extends Logging { poolConfig: PoolConfig) extends Logging {
def this(conf: SparkConf) = { def this(conf: SparkConf) = {
this(new ObjectFactory, new PoolConfig(conf)) this(new ObjectFactory, new PoolConfig(conf))
} }
@ -147,7 +147,7 @@ private[kafka010] class InternalKafkaConsumerPool(
} }
} }
private[kafka010] object InternalKafkaConsumerPool { private[consumer] object InternalKafkaConsumerPool {
object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging { object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging {
override def onSwallowException(e: Exception): Unit = { override def onSwallowException(e: Exception): Unit = {
logError(s"Error closing Kafka consumer", e) logError(s"Error closing Kafka consumer", e)
@ -218,4 +218,3 @@ private[kafka010] object InternalKafkaConsumerPool {
} }
} }
} }

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.sql.kafka010 package org.apache.spark.sql.kafka010.consumer
import java.{util => ju} import java.{util => ju}
import java.io.Closeable import java.io.Closeable
@ -29,9 +29,9 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf, KafkaTokenUtil} import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
/** /**
@ -47,13 +47,15 @@ private[kafka010] class InternalKafkaConsumer(
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
private[kafka010] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig( // Exposed for testing
private[consumer] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(
SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
.asInstanceOf[String]) .asInstanceOf[String])
// Kafka consumer is not able to give back the params instantiated with so we need to store it. // Kafka consumer is not able to give back the params instantiated with so we need to store it.
// It must be updated whenever a new consumer is created. // It must be updated whenever a new consumer is created.
private[kafka010] var kafkaParamsWithSecurity: ju.Map[String, Object] = _ // Exposed for testing
private[consumer] var kafkaParamsWithSecurity: ju.Map[String, Object] = _
private val consumer = createConsumer() private val consumer = createConsumer()
/** /**
@ -139,7 +141,7 @@ private[kafka010] class InternalKafkaConsumer(
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained. * poll when `records` is drained.
*/ */
private[kafka010] case class FetchedData( private[consumer] case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long, private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) { private var _offsetAfterPoll: Long) {
@ -196,7 +198,7 @@ private[kafka010] case class FetchedData(
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead. * instead.
*/ */
private[kafka010] case class FetchedRecord( private[consumer] case class FetchedRecord(
var record: ConsumerRecord[Array[Byte], Array[Byte]], var record: ConsumerRecord[Array[Byte], Array[Byte]],
var nextOffsetToFetch: Long) { var nextOffsetToFetch: Long) {
@ -223,7 +225,8 @@ private[kafka010] class KafkaDataConsumer(
fetchedDataPool: FetchedDataPool) extends Logging { fetchedDataPool: FetchedDataPool) extends Logging {
import KafkaDataConsumer._ import KafkaDataConsumer._
@volatile private[kafka010] var _consumer: Option[InternalKafkaConsumer] = None // Exposed for testing
@volatile private[consumer] var _consumer: Option[InternalKafkaConsumer] = None
@volatile private var _fetchedData: Option[FetchedData] = None @volatile private var _fetchedData: Option[FetchedData] = None
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.sql.kafka010 package org.apache.spark.sql.kafka010.consumer
import java.{util => ju} import java.{util => ju}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -29,7 +29,8 @@ import org.jmock.lib.concurrent.DeterministicScheduler
import org.scalatest.PrivateMethodTester import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ManualClock import org.apache.spark.util.ManualClock

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.sql.kafka010 package org.apache.spark.sql.kafka010.consumer
import java.{util => ju} import java.{util => ju}
@ -26,7 +26,8 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.kafka010.{CONSUMER_CACHE_CAPACITY, CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, CONSUMER_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
class InternalKafkaConsumerPoolSuite extends SharedSparkSession { class InternalKafkaConsumerPoolSuite extends SharedSparkSession {

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.sql.kafka010 package org.apache.spark.sql.kafka010.consumer
import java.{util => ju} import java.{util => ju}
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
@ -32,7 +32,8 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.{TaskContext, TaskContextImpl} import org.apache.spark.{TaskContext, TaskContextImpl}
import org.apache.spark.kafka010.KafkaDelegationTokenTest import org.apache.spark.kafka010.KafkaDelegationTokenTest
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
class KafkaDataConsumerSuite class KafkaDataConsumerSuite