diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 645b68b0c4..8b37fd6e7e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer /** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */ private[kafka010] case class KafkaBatchInputPartition( diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 0603ae39ba..0b549870a3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer import org.apache.spark.sql.util.CaseInsensitiveStringMap /** diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index dae9515205..f1f3871fc7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -19,18 +19,15 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import scala.collection.mutable.ArrayBuffer - -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator - /** Offset range that one partition of the KafkaSourceRDD has to read */ private[kafka010] case class KafkaSourceRDDOffsetRange( topicPartition: TopicPartition, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala similarity index 92% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala index 6f18407a17..6174bfb203 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} @@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** @@ -39,7 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * modified in same instance, this class cannot be replaced with general pool implementations * including Apache Commons Pool which pools KafkaConsumer. */ -private[kafka010] class FetchedDataPool( +private[consumer] class FetchedDataPool( executorService: ScheduledExecutorService, clock: Clock, conf: SparkConf) extends Logging { @@ -159,8 +160,8 @@ private[kafka010] class FetchedDataPool( } } -private[kafka010] object FetchedDataPool { - private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +private[consumer] object FetchedDataPool { + private[consumer] case class CachedFetchedData(fetchedData: FetchedData) { var lastReleasedTimestamp: Long = Long.MaxValue var lastAcquiredTimestamp: Long = Long.MinValue var inUse: Boolean = false @@ -179,5 +180,5 @@ private[kafka010] object FetchedDataPool { } } - private[kafka010] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + private[consumer] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPool.scala similarity index 96% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPool.scala index 276a942742..2256f96c66 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPool.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.util.concurrent.ConcurrentHashMap @@ -25,8 +25,9 @@ import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010._ +import org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey /** * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. @@ -45,10 +46,9 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] * unless caller shares the object to multiple threads. */ -private[kafka010] class InternalKafkaConsumerPool( +private[consumer] class InternalKafkaConsumerPool( objectFactory: ObjectFactory, poolConfig: PoolConfig) extends Logging { - def this(conf: SparkConf) = { this(new ObjectFactory, new PoolConfig(conf)) } @@ -147,7 +147,7 @@ private[kafka010] class InternalKafkaConsumerPool( } } -private[kafka010] object InternalKafkaConsumerPool { +private[consumer] object InternalKafkaConsumerPool { object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging { override def onSwallowException(e: Exception): Unit = { logError(s"Error closing Kafka consumer", e) @@ -218,4 +218,3 @@ private[kafka010] object InternalKafkaConsumerPool { } } } - diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala similarity index 98% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 58fd6608db..7aa386f74c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.io.Closeable @@ -29,9 +29,9 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf, KafkaTokenUtil} -import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} +import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} /** @@ -47,13 +47,15 @@ private[kafka010] class InternalKafkaConsumer( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private[kafka010] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig( + // Exposed for testing + private[consumer] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig( SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) .asInstanceOf[String]) // Kafka consumer is not able to give back the params instantiated with so we need to store it. // It must be updated whenever a new consumer is created. - private[kafka010] var kafkaParamsWithSecurity: ju.Map[String, Object] = _ + // Exposed for testing + private[consumer] var kafkaParamsWithSecurity: ju.Map[String, Object] = _ private val consumer = createConsumer() /** @@ -139,7 +141,7 @@ private[kafka010] class InternalKafkaConsumer( * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to * poll when `records` is drained. */ -private[kafka010] case class FetchedData( +private[consumer] case class FetchedData( private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], private var _nextOffsetInFetchedData: Long, private var _offsetAfterPoll: Long) { @@ -196,7 +198,7 @@ private[kafka010] case class FetchedData( * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch * instead. */ -private[kafka010] case class FetchedRecord( +private[consumer] case class FetchedRecord( var record: ConsumerRecord[Array[Byte], Array[Byte]], var nextOffsetToFetch: Long) { @@ -223,7 +225,8 @@ private[kafka010] class KafkaDataConsumer( fetchedDataPool: FetchedDataPool) extends Logging { import KafkaDataConsumer._ - @volatile private[kafka010] var _consumer: Option[InternalKafkaConsumer] = None + // Exposed for testing + @volatile private[consumer] var _consumer: Option[InternalKafkaConsumer] = None @volatile private var _fetchedData: Option[FetchedData] = None private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala similarity index 97% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala index 5449f5d733..23bab5cd48 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.util.concurrent.TimeUnit @@ -29,7 +29,8 @@ import org.jmock.lib.concurrent.DeterministicScheduler import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ManualClock diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPoolSuite.scala similarity index 97% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPoolSuite.scala index 78d7feef58..3797d5b5bd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/InternalKafkaConsumerPoolSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} @@ -26,7 +26,8 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.spark.SparkConf -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.{CONSUMER_CACHE_CAPACITY, CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, CONSUMER_CACHE_TIMEOUT} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession class InternalKafkaConsumerPoolSuite extends SharedSparkSession { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala similarity index 98% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala index d22955180d..c607c4fc81 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.sql.kafka010.consumer import java.{util => ju} import java.nio.charset.StandardCharsets @@ -32,7 +32,8 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.{TaskContext, TaskContextImpl} import org.apache.spark.kafka010.KafkaDelegationTokenTest -import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession class KafkaDataConsumerSuite