[SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer

## What changes were proposed in this pull request?

This patch does pooling for both kafka consumers as well as fetched data. The overall benefits of the patch are following:

* Both pools support eviction on idle objects, which will help closing invalid idle objects which topic or partition are no longer be assigned to any tasks.
* It also enables applying different policies on pool, which helps optimization of pooling for each pool.
* We concerned about multiple tasks pointing same topic partition as well as same group id, and existing code can't handle this hence excess seek and fetch could happen. This patch properly handles the case.
* It also makes the code always safe to leverage cache, hence no need to maintain reuseCache parameter.

Moreover, pooling kafka consumers is implemented based on Apache Commons Pool, which also gives couple of benefits:

* We can get rid of synchronization of KafkaDataConsumer object while acquiring and returning InternalKafkaConsumer.
* We can extract the feature of object pool to outside of the class, so that the behaviors of the pool can be tested easily.
* We can get various statistics for the object pool, and also be able to enable JMX for the pool.

FetchedData instances are pooled by custom implementation of pool instead of leveraging Apache Commons Pool, because they have CacheKey as first key and "desired offset" as second key which "desired offset" is changing - I haven't found any general pool implementations supporting this.

This patch brings additional dependency, Apache Commons Pool 2.6.0 into `spark-sql-kafka-0-10` module.

## How was this patch tested?

Existing unit tests as well as new tests for object pool.

Also did some experiment regarding proving concurrent access of consumers for same topic partition.

* Made change on both sides (master and patch) to log when creating Kafka consumer or fetching records from Kafka is happening.
* branches
  * master: https://github.com/HeartSaVioR/spark/tree/SPARK-25151-master-ref-debugging
  * patch: https://github.com/HeartSaVioR/spark/tree/SPARK-25151-debugging
* Test query (doing self-join)
  * https://gist.github.com/HeartSaVioR/d831974c3f25c02846f4b15b8d232cc2
* Ran query from spark-shell, with using `local[*]` to maximize the chance to have concurrent access
* Collected the count of fetch requests on Kafka via command: `grep "creating new Kafka consumer" logfile | wc -l`
* Collected the count of creating Kafka consumers via command: `grep "fetching data from Kafka consumer" logfile | wc -l`

Topic and data distribution is follow:

```
truck_speed_events_stream_spark_25151_v1:0:99440
truck_speed_events_stream_spark_25151_v1:1:99489
truck_speed_events_stream_spark_25151_v1:2:397759
truck_speed_events_stream_spark_25151_v1:3:198917
truck_speed_events_stream_spark_25151_v1:4:99484
truck_speed_events_stream_spark_25151_v1:5:497320
truck_speed_events_stream_spark_25151_v1:6:99430
truck_speed_events_stream_spark_25151_v1:7:397887
truck_speed_events_stream_spark_25151_v1:8:397813
truck_speed_events_stream_spark_25151_v1:9:0
```

The experiment only used smallest 4 partitions (0, 1, 4, 6) from these partitions to finish the query earlier.

The result of experiment is below:

branch | create Kafka consumer | fetch request
-- | -- | --
master | 1986 | 2837
patch | 8 | 1706

Closes #22138 from HeartSaVioR/SPARK-25151.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: Jungtaek Lim <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-09-04 10:17:38 -07:00 committed by Marcelo Vanzin
parent 712874fa09
commit 594c9c5a3e
18 changed files with 1655 additions and 412 deletions

View file

@ -430,20 +430,75 @@ The following configurations are optional:
### Consumer Caching
It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information:
Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool.
The caching key is built up from the following information:
* Topic name
* Topic partition
* Group ID
The size of the cache is limited by <code>spark.kafka.consumer.cache.capacity</code> (default: 64).
If this threshold is reached, it tries to remove the least-used entry that is currently not in use.
If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
after which it will never reduce.
The following properties are available to configure the consumer pool:
If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons.
At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to
be emphasized it will not be closed if any other task is using it.
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.kafka.consumer.cache.capacity</td>
<td>The maximum number of consumers cached. Please note that it's a soft limit.</td>
<td>64</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.timeout</td>
<td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
<td>5m (5 minutes)</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minutes)</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.jmx.enable</td>
<td>Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
</td>
<td>false</td>
</tr>
</table>
The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>,
but it works as "soft-limit" to not block Spark tasks.
Idle eviction thread periodically removes consumers which are not used longer than given timeout.
If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use.
If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to
the max number of concurrent tasks that can run in the executor (that is, number of task slots).
If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons.
At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used
in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well
when they are returned into pool.
Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point
of Spark's view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool.
Note that it doesn't leverage Apache Commons Pool due to the difference of characteristics.
The following properties are available to configure the fetched data pool:
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.kafka.consumer.fetchedData.cache.timeout</td>
<td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
<td>5m (5 minutes)</td>
</tr>
<tr>
<td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
<td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
<td>1m (1 minutes)</td>
</tr>
</table>
## Writing Data to Kafka

View file

@ -72,6 +72,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons-pool2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
@ -129,6 +134,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
</dependency>
<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

View file

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.LongAdder
import scala.collection.mutable
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/**
* Provides object pool for [[FetchedData]] which is grouped by [[CacheKey]].
*
* Along with CacheKey, it receives desired start offset to find cached FetchedData which
* may be stored from previous batch. If it can't find one to match, it will create
* a new FetchedData. As "desired start offset" plays as second level of key which can be
* modified in same instance, this class cannot be replaced with general pool implementations
* including Apache Commons Pool which pools KafkaConsumer.
*/
private[kafka010] class FetchedDataPool(
executorService: ScheduledExecutorService,
clock: Clock,
conf: SparkConf) extends Logging {
import FetchedDataPool._
def this(sparkConf: SparkConf) = {
this(
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kafka-fetched-data-cache-evictor"), new SystemClock, sparkConf)
}
private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = mutable.HashMap.empty
private val minEvictableIdleTimeMillis = conf.get(FETCHED_DATA_CACHE_TIMEOUT)
private val evictorThreadRunIntervalMillis =
conf.get(FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL)
private def startEvictorThread(): Option[ScheduledFuture[_]] = {
if (evictorThreadRunIntervalMillis > 0) {
val future = executorService.scheduleAtFixedRate(() => {
Utils.tryLogNonFatalError(removeIdleFetchedData())
}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
Some(future)
} else {
None
}
}
private var scheduled = startEvictorThread()
private val numCreatedFetchedData = new LongAdder()
private val numTotalElements = new LongAdder()
def numCreated: Long = numCreatedFetchedData.sum()
def numTotal: Long = numTotalElements.sum()
def acquire(key: CacheKey, desiredStartOffset: Long): FetchedData = synchronized {
val fetchedDataList = cache.getOrElseUpdate(key, new CachedFetchedDataList())
val cachedFetchedDataOption = fetchedDataList.find { p =>
!p.inUse && p.getObject.nextOffsetInFetchedData == desiredStartOffset
}
var cachedFetchedData: CachedFetchedData = null
if (cachedFetchedDataOption.isDefined) {
cachedFetchedData = cachedFetchedDataOption.get
} else {
cachedFetchedData = CachedFetchedData.empty()
fetchedDataList += cachedFetchedData
numCreatedFetchedData.increment()
numTotalElements.increment()
}
cachedFetchedData.lastAcquiredTimestamp = clock.getTimeMillis()
cachedFetchedData.inUse = true
cachedFetchedData.getObject
}
def invalidate(key: CacheKey): Unit = synchronized {
cache.remove(key) match {
case Some(lst) => numTotalElements.add(-1 * lst.size)
case None =>
}
}
def release(key: CacheKey, fetchedData: FetchedData): Unit = synchronized {
def warnReleasedDataNotInPool(key: CacheKey, fetchedData: FetchedData): Unit = {
logWarning(s"No matching data in pool for $fetchedData in key $key. " +
"It might be released before, or it was not a part of pool.")
}
cache.get(key) match {
case Some(fetchedDataList) =>
val cachedFetchedDataOption = fetchedDataList.find { p =>
p.inUse && p.getObject == fetchedData
}
if (cachedFetchedDataOption.isEmpty) {
warnReleasedDataNotInPool(key, fetchedData)
} else {
val cachedFetchedData = cachedFetchedDataOption.get
cachedFetchedData.inUse = false
cachedFetchedData.lastReleasedTimestamp = clock.getTimeMillis()
}
case None =>
warnReleasedDataNotInPool(key, fetchedData)
}
}
def shutdown(): Unit = {
ThreadUtils.shutdown(executorService)
}
def reset(): Unit = synchronized {
scheduled.foreach(_.cancel(true))
cache.clear()
numTotalElements.reset()
numCreatedFetchedData.reset()
scheduled = startEvictorThread()
}
private def removeIdleFetchedData(): Unit = synchronized {
val now = clock.getTimeMillis()
val maxAllowedReleasedTimestamp = now - minEvictableIdleTimeMillis
cache.values.foreach { p: CachedFetchedDataList =>
val expired = p.filter { q =>
!q.inUse && q.lastReleasedTimestamp < maxAllowedReleasedTimestamp
}
p --= expired
numTotalElements.add(-1 * expired.size)
}
}
}
private[kafka010] object FetchedDataPool {
private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) {
var lastReleasedTimestamp: Long = Long.MaxValue
var lastAcquiredTimestamp: Long = Long.MinValue
var inUse: Boolean = false
def getObject: FetchedData = fetchedData
}
private object CachedFetchedData {
def empty(): CachedFetchedData = {
val emptyData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)
CachedFetchedData(emptyData)
}
}
private[kafka010] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData]
}

View file

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.concurrent.ConcurrentHashMap
import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener}
import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
/**
* Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]].
*
* This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on
* the class, and same contract applies: after using the borrowed object, you must either call
* returnObject() if the object is healthy to return to pool, or invalidateObject() if the object
* should be destroyed.
*
* The soft capacity of pool is determined by "spark.kafka.consumer.cache.capacity" config value,
* and the pool will have reasonable default value if the value is not provided.
* (The instance will do its best effort to respect soft capacity but it can exceed when there's
* a borrowing request and there's neither free space nor idle object to clear.)
*
* This class guarantees that no caller will get pooled object once the object is borrowed and
* not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]]
* unless caller shares the object to multiple threads.
*/
private[kafka010] class InternalKafkaConsumerPool(
objectFactory: ObjectFactory,
poolConfig: PoolConfig) extends Logging {
def this(conf: SparkConf) = {
this(new ObjectFactory, new PoolConfig(conf))
}
// the class is intended to have only soft capacity
assert(poolConfig.getMaxTotal < 0)
private val pool = {
val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer](
objectFactory, poolConfig)
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
internalPool
}
/**
* Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key,
* the pool will create the [[InternalKafkaConsumer]] object.
*
* If the pool doesn't have idle object for the key and also exceeds the soft capacity,
* pool will try to clear some of idle objects.
*
* Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise
* the object will be kept in pool as active object.
*/
def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = {
updateKafkaParamForKey(key, kafkaParams)
if (size >= poolConfig.softMaxSize) {
logWarning("Pool exceeds its soft max size, cleaning up idle objects...")
pool.clearOldest()
}
pool.borrowObject(key)
}
/** Returns borrowed object to the pool. */
def returnObject(consumer: InternalKafkaConsumer): Unit = {
pool.returnObject(extractCacheKey(consumer), consumer)
}
/** Invalidates (destroy) borrowed object to the pool. */
def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
pool.invalidateObject(extractCacheKey(consumer), consumer)
}
/** Invalidates all idle consumers for the key */
def invalidateKey(key: CacheKey): Unit = {
pool.clear(key)
}
/**
* Closes the keyed object pool. Once the pool is closed,
* borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject
* will continue to work, with returned objects destroyed on return.
*
* Also destroys idle instances in the pool.
*/
def close(): Unit = {
pool.close()
}
def reset(): Unit = {
// this is the best-effort of clearing up. otherwise we should close the pool and create again
// but we don't want to make it "var" only because of tests.
pool.clear()
}
def numIdle: Int = pool.getNumIdle
def numIdle(key: CacheKey): Int = pool.getNumIdle(key)
def numActive: Int = pool.getNumActive
def numActive(key: CacheKey): Int = pool.getNumActive(key)
def size: Int = numIdle + numActive
def size(key: CacheKey): Int = numIdle(key) + numActive(key)
// TODO: revisit the relation between CacheKey and kafkaParams - for now it looks a bit weird
// as we force all consumers having same (groupId, topicPartition) to have same kafkaParams
// which might be viable in performance perspective (kafkaParams might be too huge to use
// as a part of key), but there might be the case kafkaParams could be different -
// cache key should be differentiated for both kafkaParams.
private def updateKafkaParamForKey(key: CacheKey, kafkaParams: ju.Map[String, Object]): Unit = {
// We can assume that kafkaParam should not be different for same cache key,
// otherwise we can't reuse the cached object and cache key should contain kafkaParam.
// So it should be safe to put the key/value pair only when the key doesn't exist.
val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams)
require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " +
s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams")
}
private def extractCacheKey(consumer: InternalKafkaConsumer): CacheKey = {
new CacheKey(consumer.topicPartition, consumer.kafkaParams)
}
}
private[kafka010] object InternalKafkaConsumerPool {
object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging {
override def onSwallowException(e: Exception): Unit = {
logError(s"Error closing Kafka consumer", e)
}
}
class PoolConfig(conf: SparkConf) extends GenericKeyedObjectPoolConfig[InternalKafkaConsumer] {
private var _softMaxSize = Int.MaxValue
def softMaxSize: Int = _softMaxSize
init()
def init(): Unit = {
_softMaxSize = conf.get(CONSUMER_CACHE_CAPACITY)
val jmxEnabled = conf.get(CONSUMER_CACHE_JMX_ENABLED)
val minEvictableIdleTimeMillis = conf.get(CONSUMER_CACHE_TIMEOUT)
val evictorThreadRunIntervalMillis = conf.get(
CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL)
// NOTE: Below lines define the behavior, so do not modify unless you know what you are
// doing, and update the class doc accordingly if necessary when you modify.
// 1. Set min idle objects per key to 0 to avoid creating unnecessary object.
// 2. Set max idle objects per key to 3 but set total objects per key to infinite
// which ensures borrowing per key is not restricted.
// 3. Set max total objects to infinite which ensures all objects are managed in this pool.
setMinIdlePerKey(0)
setMaxIdlePerKey(3)
setMaxTotalPerKey(-1)
setMaxTotal(-1)
// Set minimum evictable idle time which will be referred from evictor thread
setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
setSoftMinEvictableIdleTimeMillis(-1)
// evictor thread will run test with ten idle objects
setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis)
setNumTestsPerEvictionRun(10)
setEvictionPolicy(new DefaultEvictionPolicy[InternalKafkaConsumer]())
// Immediately fail on exhausted pool while borrowing
setBlockWhenExhausted(false)
setJmxEnabled(jmxEnabled)
setJmxNamePrefix("kafka010-cached-simple-kafka-consumer-pool")
}
}
class ObjectFactory extends BaseKeyedPooledObjectFactory[CacheKey, InternalKafkaConsumer] {
val keyToKafkaParams = new ConcurrentHashMap[CacheKey, ju.Map[String, Object]]()
override def create(key: CacheKey): InternalKafkaConsumer = {
Option(keyToKafkaParams.get(key)) match {
case Some(kafkaParams) => new InternalKafkaConsumer(key.topicPartition, kafkaParams)
case None => throw new IllegalStateException("Kafka params should be set before " +
"borrowing object.")
}
}
override def wrap(value: InternalKafkaConsumer): PooledObject[InternalKafkaConsumer] = {
new DefaultPooledObject[InternalKafkaConsumer](value)
}
override def destroyObject(key: CacheKey, p: PooledObject[InternalKafkaConsumer]): Unit = {
p.getObject.close()
}
}
}

View file

@ -91,7 +91,7 @@ private[kafka010] class KafkaBatch(
KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId)
offsetRanges.map { range =>
new KafkaBatchInputPartition(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, false)
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
}.toArray
}

View file

@ -30,14 +30,13 @@ private[kafka010] case class KafkaBatchInputPartition(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends InputPartition
failOnDataLoss: Boolean) extends InputPartition
private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val p = partition.asInstanceOf[KafkaBatchInputPartition]
KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs,
p.failOnDataLoss, p.reuseKafkaConsumer)
p.failOnDataLoss)
}
}
@ -46,11 +45,9 @@ private case class KafkaBatchPartitionReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends PartitionReader[InternalRow] with Logging {
failOnDataLoss: Boolean) extends PartitionReader[InternalRow] with Logging {
private val consumer = KafkaDataConsumer.acquire(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
private val consumer = KafkaDataConsumer.acquire(offsetRange.topicPartition, executorKafkaParams)
private val rangeToRead = resolveRange(offsetRange)
private val converter = new KafkaRecordToUnsafeRowConverter

View file

@ -185,7 +185,7 @@ class KafkaContinuousPartitionReader(
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] {
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
private val converter = new KafkaRecordToUnsafeRowConverter
private var nextKafkaOffset = startOffset

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.io.Closeable
import java.util.concurrent.TimeoutException
import scala.collection.JavaConverters._
@ -25,14 +26,211 @@ import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
/**
* This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector.
*
* NOTE: Like KafkaConsumer, this class is not thread-safe.
* NOTE for contributors: It is possible for the instance to be used from multiple callers,
* so all the methods should not rely on current cursor and use seek manually.
*/
private[kafka010] class InternalKafkaConsumer(
val topicPartition: TopicPartition,
val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
private val consumer = createConsumer()
/**
* Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record"
* and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches
* some messages but all of them are not visible messages (either transaction messages,
* or aborted messages when `isolation.level` is `read_committed`).
*
* @throws OffsetOutOfRangeException if `offset` is out of range.
* @throws TimeoutException if the consumer position is not changed after polling. It means the
* consumer polls nothing before timeout.
*/
def fetch(offset: Long, pollTimeoutMs: Long):
(ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
val fetchedData = (r, offsetAfterPoll)
if (r.isEmpty) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
}
}
fetchedData
}
/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
val latestOffset = consumer.position(topicPartition)
AvailableOffsetRange(earliestOffset, latestOffset)
}
override def close(): Unit = {
consumer.close()
}
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
val c = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
val tps = new ju.ArrayList[TopicPartition]()
tps.add(topicPartition)
c.assign(tps)
c
}
private def seek(offset: Long): Unit = {
logDebug(s"Seeking to $groupId $topicPartition $offset")
consumer.seek(topicPartition, offset)
}
}
// TODO: consider changing this to normal class, as having mutable variables in
// case class sounds weird.
/**
* The internal object to store the fetched data from Kafka consumer and the next offset to poll.
*
* @param _records the pre-fetched Kafka records.
* @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we
* should check if the pre-fetched data is still valid.
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
*/
private[kafka010] case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {
def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
this._records = records
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
this._offsetAfterPoll = offsetAfterPoll
this
}
/** Whether there are more elements */
def hasNext: Boolean = _records.hasNext
/** Move `records` forward and return the next record. */
def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
val record = _records.next()
_nextOffsetInFetchedData = record.offset + 1
record
}
/** Move `records` backward and return the previous record. */
def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
assert(_records.hasPrevious, "fetchedData cannot move back")
val record = _records.previous()
_nextOffsetInFetchedData = record.offset
record
}
/** Reset the internal pre-fetched data. */
def reset(): Unit = {
_records = ju.Collections.emptyListIterator()
_nextOffsetInFetchedData = UNKNOWN_OFFSET
_offsetAfterPoll = UNKNOWN_OFFSET
}
/**
* Returns the next offset in `records`. We use this to verify if we should check if the
* pre-fetched data is still valid.
*/
def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData
/**
* Returns the next offset to poll after draining the pre-fetched records.
*/
def offsetAfterPoll: Long = _offsetAfterPoll
}
// TODO: consider changing this to normal class, as having mutable variables in
// case class sounds weird.
/**
* The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is
* invisible (either a transaction message, or an aborted message when the consumer's
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead.
*/
private[kafka010] case class FetchedRecord(
var record: ConsumerRecord[Array[Byte], Array[Byte]],
var nextOffsetToFetch: Long) {
def withRecord(
record: ConsumerRecord[Array[Byte], Array[Byte]],
nextOffsetToFetch: Long): FetchedRecord = {
this.record = record
this.nextOffsetToFetch = nextOffsetToFetch
this
}
}
/**
* This class helps caller to read from Kafka leveraging consumer pool as well as fetched data pool.
* This class throws error when data loss is detected while reading from Kafka.
*
* NOTE for contributors: we need to ensure all the public methods to initialize necessary resources
* via calling `getOrRetrieveConsumer` and `getOrRetrieveFetchedData`.
*/
private[kafka010] class KafkaDataConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object],
consumerPool: InternalKafkaConsumerPool,
fetchedDataPool: FetchedDataPool) extends Logging {
import KafkaDataConsumer._
@volatile private[kafka010] var _consumer: Option[InternalKafkaConsumer] = None
@volatile private var _fetchedData: Option[FetchedData] = None
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
private val cacheKey = CacheKey(groupId, topicPartition)
/**
* The fetched record returned from the `fetchRecord` method. This is a reusable private object to
* avoid memory allocation.
*/
private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)
private[kafka010] sealed trait KafkaDataConsumer {
/**
* Get the record for the given offset if available.
*
@ -57,179 +255,6 @@ private[kafka010] sealed trait KafkaDataConsumer {
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
*/
def get(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss)
}
/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = internalConsumer.getAvailableOffsetRange()
/**
* Release this consumer from being further used. Depending on its implementation,
* this consumer will be either finalized, or reset for reuse later.
*/
def release(): Unit
/** Reference to the internal implementation that this wrapper delegates to */
def internalConsumer: InternalKafkaConsumer
}
/**
* A wrapper around Kafka's KafkaConsumer that throws error when data loss is detected.
* This is not for direct use outside this file.
*/
private[kafka010] case class InternalKafkaConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._
/**
* The internal object to store the fetched data from Kafka consumer and the next offset to poll.
*
* @param _records the pre-fetched Kafka records.
* @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we
* should check if the pre-fetched data is still valid.
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
*/
private case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {
def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
this._records = records
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
this._offsetAfterPoll = offsetAfterPoll
this
}
/** Whether there are more elements */
def hasNext: Boolean = _records.hasNext
/** Move `records` forward and return the next record. */
def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
val record = _records.next()
_nextOffsetInFetchedData = record.offset + 1
record
}
/** Move `records` backward and return the previous record. */
def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
assert(_records.hasPrevious, "fetchedData cannot move back")
val record = _records.previous()
_nextOffsetInFetchedData = record.offset
record
}
/** Reset the internal pre-fetched data. */
def reset(): Unit = {
_records = ju.Collections.emptyListIterator()
_nextOffsetInFetchedData = UNKNOWN_OFFSET
_offsetAfterPoll = UNKNOWN_OFFSET
}
/**
* Returns the next offset in `records`. We use this to verify if we should check if the
* pre-fetched data is still valid.
*/
def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData
/**
* Returns the next offset to poll after draining the pre-fetched records.
*/
def offsetAfterPoll: Long = _offsetAfterPoll
}
/**
* The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is
* invisible (either a transaction message, or an aborted message when the consumer's
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead.
*/
private case class FetchedRecord(
var record: ConsumerRecord[Array[Byte], Array[Byte]],
var nextOffsetToFetch: Long) {
def withRecord(
record: ConsumerRecord[Array[Byte], Array[Byte]],
nextOffsetToFetch: Long): FetchedRecord = {
this.record = record
this.nextOffsetToFetch = nextOffsetToFetch
this
}
}
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
@volatile private var consumer = createConsumer
/** indicates whether this consumer is in use or not */
@volatile var inUse = true
/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false
/**
* The fetched data returned from Kafka consumer. This is a reusable private object to avoid
* memory allocation.
*/
private val fetchedData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)
/**
* The fetched record returned from the `fetchRecord` method. This is a reusable private object to
* avoid memory allocation.
*/
private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
val c = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
val tps = new ju.ArrayList[TopicPartition]()
tps.add(topicPartition)
c.assign(tps)
c
}
private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly(body)
case _ =>
logWarning("CachedKafkaConsumer is not running in UninterruptibleThread. " +
"It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894")
body
}
/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
val latestOffset = consumer.position(topicPartition)
AvailableOffsetRange(earliestOffset, latestOffset)
}
/** @see [[KafkaDataConsumer.get]] */
def get(
offset: Long,
untilOffset: Long,
@ -238,8 +263,13 @@ private[kafka010] case class InternalKafkaConsumer(
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
val consumer = getOrRetrieveConsumer()
val fetchedData = getOrRetrieveFetchedData(offset)
logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " +
s"requested $offset")
"requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
// `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
// we will move to the next available offset within `[offset, untilOffset)` and retry.
@ -252,7 +282,8 @@ private[kafka010] case class InternalKafkaConsumer(
while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
fetchedRecord = fetchRecord(consumer, fetchedData, toFetchOffset, untilOffset,
pollTimeoutMs, failOnDataLoss)
if (fetchedRecord.record != null) {
isFetchComplete = true
} else {
@ -269,9 +300,12 @@ private[kafka010] case class InternalKafkaConsumer(
// When there is some error thrown, it's better to use a new consumer to drop all cached
// states in the old consumer. We don't need to worry about the performance because this
// is not a common path.
resetConsumer()
reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e)
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset)
releaseConsumer()
fetchedData.reset()
reportDataLoss(topicPartition, groupId, failOnDataLoss,
s"Cannot fetch offset $toFetchOffset", e)
toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset)
}
}
@ -283,14 +317,52 @@ private[kafka010] case class InternalKafkaConsumer(
}
}
/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
val consumer = getOrRetrieveConsumer()
consumer.getAvailableOffsetRange()
}
/**
* Release borrowed objects in data reader to the pool. Once the instance is created, caller
* must call method after using the instance to make sure resources are not leaked.
*/
def release(): Unit = {
releaseConsumer()
releaseFetchedData()
}
private def releaseConsumer(): Unit = {
if (_consumer.isDefined) {
consumerPool.returnObject(_consumer.get)
_consumer = None
}
}
private def releaseFetchedData(): Unit = {
if (_fetchedData.isDefined) {
fetchedDataPool.release(cacheKey, _fetchedData.get)
_fetchedData = None
}
}
/**
* Return the next earliest available offset in [offset, untilOffset). If all offsets in
* [offset, untilOffset) are invalid (e.g., the topic is deleted and recreated), it will return
* `UNKNOWN_OFFSET`.
*/
private def getEarliestAvailableOffsetBetween(offset: Long, untilOffset: Long): Long = {
val range = getAvailableOffsetRange()
private def getEarliestAvailableOffsetBetween(
consumer: InternalKafkaConsumer,
offset: Long,
untilOffset: Long): Long = {
val range = consumer.getAvailableOffsetRange()
logWarning(s"Some data may be lost. Recovering from the earliest offset: ${range.earliest}")
val topicPartition = consumer.topicPartition
val groupId = consumer.groupId
if (offset >= range.latest || range.earliest >= untilOffset) {
// [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap,
// either
@ -305,10 +377,10 @@ private[kafka010] case class InternalKafkaConsumer(
// | | | |
// offset untilOffset earliestOffset latestOffset
val warningMessage =
s"""
|The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, $untilOffset) will be
| skipped ${additionalMessage(failOnDataLoss = false)}
s"""
|The current available offset range is $range.
| Offset $offset is out of range, and records in [$offset, $untilOffset) will be
| skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}
""".stripMargin
logWarning(warningMessage)
UNKNOWN_OFFSET
@ -321,8 +393,8 @@ private[kafka010] case class InternalKafkaConsumer(
// This will happen when a topic is deleted and recreated, and new data are pushed very fast,
// then we will see `offset` disappears first then appears again. Although the parameters
// are same, the state in Kafka cluster is changed, so the outer loop won't be endless.
logWarning(s"Found a disappeared offset $offset. " +
s"Some data may be lost ${additionalMessage(failOnDataLoss = false)}")
logWarning(s"Found a disappeared offset $offset. Some data may be lost " +
s"${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}")
offset
} else {
// ------------------------------------------------------------------------------
@ -330,10 +402,10 @@ private[kafka010] case class InternalKafkaConsumer(
// | | | |
// offset earliestOffset min(untilOffset,latestOffset) max(untilOffset, latestOffset)
val warningMessage =
s"""
|The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be
| skipped ${additionalMessage(failOnDataLoss = false)}
s"""
|The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be
| skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}
""".stripMargin
logWarning(warningMessage)
range.earliest
@ -355,6 +427,8 @@ private[kafka010] case class InternalKafkaConsumer(
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchRecord(
consumer: InternalKafkaConsumer,
fetchedData: FetchedData,
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
@ -362,7 +436,7 @@ private[kafka010] case class InternalKafkaConsumer(
if (offset != fetchedData.nextOffsetInFetchedData) {
// This is the first fetch, or the fetched data has been reset.
// Fetch records from Kafka and update `fetchedData`.
fetchData(offset, pollTimeoutMs)
fetchData(consumer, fetchedData, offset, pollTimeoutMs)
} else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained.
if (offset < fetchedData.offsetAfterPoll) {
// Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
@ -372,7 +446,7 @@ private[kafka010] case class InternalKafkaConsumer(
return fetchedRecord.withRecord(null, nextOffsetToFetch)
} else {
// Fetch records from Kafka and update `fetchedData`.
fetchData(offset, pollTimeoutMs)
fetchData(consumer, fetchedData, offset, pollTimeoutMs)
}
}
@ -388,7 +462,7 @@ private[kafka010] case class InternalKafkaConsumer(
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
val range = getAvailableOffsetRange()
val range = consumer.getAvailableOffsetRange()
if (range.earliest <= offset) {
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
@ -398,16 +472,19 @@ private[kafka010] case class InternalKafkaConsumer(
}
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = true,
s"Cannot fetch records in [$offset, ${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
throw new IllegalStateException(
"reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
} else if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false,
s"Skip missing records in [$offset, $untilOffset)")
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
fetchedRecord.withRecord(null, untilOffset)
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false,
s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
} else if (record.offset < offset) {
@ -421,17 +498,49 @@ private[kafka010] case class InternalKafkaConsumer(
}
}
/** Create a new consumer and reset cached states */
private def resetConsumer(): Unit = {
consumer.close()
consumer = createConsumer
fetchedData.reset()
/**
* Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be
* empty if the Kafka consumer fetches some messages but all of them are not visible messages
* (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
*
* @throws OffsetOutOfRangeException if `offset` is out of range.
* @throws TimeoutException if the consumer position is not changed after polling. It means the
* consumer polls nothing before timeout.
*/
private def fetchData(
consumer: InternalKafkaConsumer,
fetchedData: FetchedData,
offset: Long,
pollTimeoutMs: Long): Unit = {
val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
}
private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match {
case None =>
_consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
require(_consumer.isDefined, "borrowing consumer from pool must always succeed.")
_consumer.get
case Some(consumer) => consumer
}
private def getOrRetrieveFetchedData(offset: Long): FetchedData = _fetchedData match {
case None =>
_fetchedData = Option(fetchedDataPool.acquire(cacheKey, offset))
require(_fetchedData.isDefined, "acquiring fetched data from cache must always succeed.")
_fetchedData.get
case Some(fetchedData) => fetchedData
}
/**
* Return an addition message including useful message and instruction.
*/
private def additionalMessage(failOnDataLoss: Boolean): String = {
private def additionalMessage(
topicPartition: TopicPartition,
groupId: String,
failOnDataLoss: Boolean): String = {
if (failOnDataLoss) {
s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE"
@ -445,198 +554,71 @@ private[kafka010] case class InternalKafkaConsumer(
* Throw an exception or log a warning as per `failOnDataLoss`.
*/
private def reportDataLoss(
topicPartition: TopicPartition,
groupId: String,
failOnDataLoss: Boolean,
message: String,
cause: Throwable = null): Unit = {
val finalMessage = s"$message ${additionalMessage(failOnDataLoss)}"
val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}"
reportDataLoss0(failOnDataLoss, finalMessage, cause)
}
def close(): Unit = consumer.close()
private def seek(offset: Long): Unit = {
logDebug(s"Seeking to $groupId $topicPartition $offset")
consumer.seek(topicPartition, offset)
}
/**
* Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be
* empty if the Kafka consumer fetches some messages but all of them are not visible messages
* (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
*
* @throws OffsetOutOfRangeException if `offset` is out of range.
* @throws TimeoutException if the consumer position is not changed after polling. It means the
* consumer polls nothing before timeout.
*/
private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
if (!fetchedData.hasNext) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
}
}
private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly(body)
case _ =>
logWarning("KafkaDataConsumer is not running in UninterruptibleThread. " +
"It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894")
body
}
}
private[kafka010] object KafkaDataConsumer extends Logging {
val UNKNOWN_OFFSET = -2L
case class AvailableOffsetRange(earliest: Long, latest: Long)
private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
assert(internalConsumer.inUse) // make sure this has been set to true
override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) }
}
private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = { internalConsumer.close() }
}
private[kafka010] case class CacheKey(groupId: String, topicPartition: TopicPartition) {
case class CacheKey(groupId: String, topicPartition: TopicPartition) {
def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) =
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition)
}
// This cache has the following important properties.
// - We make a best-effort attempt to maintain the max size of the cache as configured capacity.
// The capacity is not guaranteed to be maintained, especially when there are more active
// tasks simultaneously using consumers than the capacity.
private[kafka010] lazy val cache = {
val conf = SparkEnv.get.conf
val capacity = conf.get(CONSUMER_CACHE_CAPACITY)
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {
private val sparkConf = SparkEnv.get.conf
private val consumerPool = new InternalKafkaConsumerPool(sparkConf)
private val fetchedDataPool = new FetchedDataPool(sparkConf)
// Try to remove the least-used entry if its currently not in use.
//
// If you cannot remove it, then the cache will keep growing. In the worst case,
// the cache will grow to the max number of concurrent tasks that can run in the executor,
// (that is, number of tasks slots) after which it will never reduce. This is unlikely to
// be a serious problem because an executor with more than 64 (default) tasks slots is
// likely running on a beefy machine that can handle a large number of simultaneously
// active consumers.
if (!entry.getValue.inUse && this.size > capacity) {
logWarning(
s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
try {
entry.getValue.close()
} catch {
case e: SparkException =>
logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e)
}
true
} else {
false
}
}
ShutdownHookManager.addShutdownHook { () =>
try {
fetchedDataPool.shutdown()
consumerPool.close()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while shutting down pools from shutdown hook", e)
}
}
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* Get a data reader for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
* The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]].
*
* Note: This method guarantees that the consumer returned is not currently in use by any one
* else. Within this guarantee, this method will make a best effort attempt to re-use consumers by
* caching them and tracking when they are in use.
* The returned data reader must be released explicitly.
*/
def acquire(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object],
useCache: Boolean): KafkaDataConsumer = synchronized {
val key = new CacheKey(topicPartition, kafkaParams)
val existingInternalConsumer = cache.get(key)
lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams)
kafkaParams: ju.Map[String, Object]): KafkaDataConsumer = {
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
// If this is reattempt at running the task, then invalidate cached consumer if any and
// start with a new one.
if (existingInternalConsumer != null) {
// Consumer exists in cache. If its in use, mark it for closing later, or close it now.
if (existingInternalConsumer.inUse) {
existingInternalConsumer.markedForClose = true
} else {
existingInternalConsumer.close()
}
}
cache.remove(key) // Invalidate the cache in any case
NonCachedKafkaDataConsumer(newInternalConsumer)
val cacheKey = new CacheKey(topicPartition, kafkaParams)
} else if (!useCache) {
// If planner asks to not reuse consumers, then do not use it, return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
// If this is reattempt at running the task, then invalidate cached consumer if any.
consumerPool.invalidateKey(cacheKey)
} else if (existingInternalConsumer == null) {
// If consumer is not already cached, then put a new in the cache and return it
cache.put(key, newInternalConsumer)
newInternalConsumer.inUse = true
CachedKafkaDataConsumer(newInternalConsumer)
} else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
} else {
// If consumer is already cached and is currently not in use, then return that consumer
existingInternalConsumer.inUse = true
CachedKafkaDataConsumer(existingInternalConsumer)
// invalidate all fetched data for the key as well
// sadly we can't pinpoint specific data and invalidate cause we don't have unique id
fetchedDataPool.invalidate(cacheKey)
}
new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool)
}
private def release(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {
// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (intConsumer.eq(cachedIntConsumer)) {
// The released consumer is the same object as the cached one.
if (intConsumer.markedForClose) {
intConsumer.close()
cache.remove(key)
} else {
intConsumer.inUse = false
}
} else {
// The released consumer is either not the same one as in the cache, or not in the cache
// at all. This may happen if the cache was invalidate while this consumer was being used.
// Just close this consumer.
intConsumer.close()
logInfo(s"Released a supposedly cached consumer that was not found in the cache")
}
}
}
}
private[kafka010] object InternalKafkaConsumer extends Logging {
private val UNKNOWN_OFFSET = -2L
private def reportDataLoss0(
failOnDataLoss: Boolean,
finalMessage: String,
@ -655,4 +637,5 @@ private[kafka010] object InternalKafkaConsumer extends Logging {
}
}
}
}

View file

@ -144,14 +144,9 @@ private[kafka010] class KafkaMicroBatchStream(
untilOffsets = untilOffsets,
executorLocations = getSortedExecutorList())
// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
// that is, concurrent tasks will not read the same TopicPartitions.
val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
// Generate factories based on the offset ranges
offsetRanges.map { range =>
KafkaBatchInputPartition(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
KafkaBatchInputPartition(range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
}.toArray
}

View file

@ -102,7 +102,7 @@ private[kafka010] class KafkaRelation(
KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId)
val rdd = new KafkaSourceRDD(
sqlContext.sparkContext, executorKafkaParams, offsetRanges,
pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
pollTimeoutMs, failOnDataLoss).map { cr =>
InternalRow(
cr.key,
cr.value,

View file

@ -268,8 +268,7 @@ private[kafka010] class KafkaSource(
// Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays.
val rdd = new KafkaSourceRDD(
sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss,
reuseKafkaConsumer = true).map { cr =>
sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr =>
InternalRow(
cr.key,
cr.value,

View file

@ -63,8 +63,7 @@ private[kafka010] class KafkaSourceRDD(
executorKafkaParams: ju.Map[String, Object],
offsetRanges: Seq[KafkaSourceRDDOffsetRange],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean)
failOnDataLoss: Boolean)
extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]](sc, Nil) {
override def persist(newLevel: StorageLevel): this.type = {
@ -87,7 +86,7 @@ private[kafka010] class KafkaSourceRDD(
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition]
val consumer = KafkaDataConsumer.acquire(
sourcePartition.offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
sourcePartition.offsetRange.topicPartition, executorKafkaParams)
val range = resolveRange(consumer, sourcePartition.offsetRange)
assert(

View file

@ -38,4 +38,40 @@ package object kafka010 { // scalastyle:ignore
" (check Structured Streaming Kafka integration guide for further details).")
.intConf
.createWithDefault(64)
private[kafka010] val CONSUMER_CACHE_JMX_ENABLED =
ConfigBuilder("spark.kafka.consumer.cache.jmx.enable")
.doc("Enable or disable JMX for pools created with this configuration instance.")
.booleanConf
.createWithDefault(false)
private[kafka010] val CONSUMER_CACHE_TIMEOUT =
ConfigBuilder("spark.kafka.consumer.cache.timeout")
.doc("The minimum amount of time a consumer may sit idle in the pool before " +
"it is eligible for eviction by the evictor. " +
"When non-positive, no consumers will be evicted from the pool due to idle time alone.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
private[kafka010] val CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval")
.doc("The interval of time between runs of the idle evictor thread for consumer pool. " +
"When non-positive, no idle evictor thread will be run.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1m")
private[kafka010] val FETCHED_DATA_CACHE_TIMEOUT =
ConfigBuilder("spark.kafka.consumer.fetchedData.cache.timeout")
.doc("The minimum amount of time a fetched data may sit idle in the pool before " +
"it is eligible for eviction by the evictor. " +
"When non-positive, no fetched data will be evicted from the pool due to idle time alone.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
private[kafka010] val FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
ConfigBuilder("spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval")
.doc("The interval of time between runs of the idle evictor thread for fetched data pool. " +
"When non-positive, no idle evictor thread will be run.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1m")
}

View file

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.jmock.lib.concurrent.DeterministicScheduler
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ManualClock
class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
import FetchedDataPool._
type Record = ConsumerRecord[Array[Byte], Array[Byte]]
private val dummyBytes = "dummy".getBytes
// Helper private method accessors for FetchedDataPool
private type PoolCacheType = mutable.Map[CacheKey, CachedFetchedDataList]
private val _cache = PrivateMethod[PoolCacheType]('cache)
def getCache(pool: FetchedDataPool): PoolCacheType = {
pool.invokePrivate(_cache())
}
test("acquire fetched data from multiple keys") {
val dataPool = new FetchedDataPool(new SparkConf())
val cacheKeys = (0 until 10).map { partId =>
CacheKey("testgroup", new TopicPartition("topic", partId))
}
assert(getCache(dataPool).size === 0)
cacheKeys.foreach { key => assert(getCache(dataPool).get(key).isEmpty) }
val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
assert(getCache(dataPool).size === cacheKeys.size)
cacheKeys.map { key =>
assert(getCache(dataPool)(key).size === 1)
assert(getCache(dataPool)(key).head.inUse)
}
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)
dataList.map { case (_, data) =>
data.withNewPoll(testRecords(0, 5).listIterator, 5)
}
dataList.foreach { case (key, data) =>
dataPool.release(key, data)
}
assert(getCache(dataPool).size === cacheKeys.size)
cacheKeys.map { key =>
assert(getCache(dataPool)(key).size === 1)
assert(!getCache(dataPool)(key).head.inUse)
}
dataPool.shutdown()
}
test("continuous use of fetched data from single key") {
val dataPool = new FetchedDataPool(new SparkConf())
val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
val data = dataPool.acquire(cacheKey, 0)
data.withNewPoll(testRecords(0, 5).listIterator, 5)
(0 to 3).foreach { _ => data.next() }
dataPool.release(cacheKey, data)
// suppose next batch
val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
assert(data.eq(data2))
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 1, expectedNumTotal = 1)
assert(getCache(dataPool)(cacheKey).size === 1)
assert(getCache(dataPool)(cacheKey).head.inUse)
dataPool.release(cacheKey, data2)
assert(getCache(dataPool)(cacheKey).size === 1)
assert(!getCache(dataPool)(cacheKey).head.inUse)
dataPool.shutdown()
}
test("multiple tasks referring same key continuously using fetched data") {
val dataPool = new FetchedDataPool(new SparkConf())
val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
val dataFromTask1 = dataPool.acquire(cacheKey, 0)
val dataFromTask2 = dataPool.acquire(cacheKey, 0)
// it shouldn't give same object as dataFromTask1 though it asks same offset
// it definitely works when offsets are not overlapped: skip adding test for that
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 2, expectedNumTotal = 2)
assert(getCache(dataPool)(cacheKey).size === 2)
assert(getCache(dataPool)(cacheKey)(1).inUse)
// reading from task 1
dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
(0 to 3).foreach { _ => dataFromTask1.next() }
dataPool.release(cacheKey, dataFromTask1)
// reading from task 2
dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
(0 to 5).foreach { _ => dataFromTask2.next() }
dataPool.release(cacheKey, dataFromTask2)
// suppose next batch for task 1
val data2FromTask1 = dataPool.acquire(cacheKey, dataFromTask1.nextOffsetInFetchedData)
assert(data2FromTask1.eq(dataFromTask1))
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 2, expectedNumTotal = 2)
assert(getCache(dataPool)(cacheKey).head.inUse)
// suppose next batch for task 2
val data2FromTask2 = dataPool.acquire(cacheKey, dataFromTask2.nextOffsetInFetchedData)
assert(data2FromTask2.eq(dataFromTask2))
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 2, expectedNumTotal = 2)
assert(getCache(dataPool)(cacheKey)(1).inUse)
// release from task 2
dataPool.release(cacheKey, data2FromTask2)
assert(!getCache(dataPool)(cacheKey)(1).inUse)
// release from task 1
dataPool.release(cacheKey, data2FromTask1)
assert(!getCache(dataPool)(cacheKey).head.inUse)
dataPool.shutdown()
}
test("evict idle fetched data") {
val minEvictableIdleTimeMillis = 2000L
val evictorThreadRunIntervalMillis = 500L
val conf = new SparkConf()
conf.set(FETCHED_DATA_CACHE_TIMEOUT, minEvictableIdleTimeMillis)
conf.set(FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, evictorThreadRunIntervalMillis)
val scheduler = new DeterministicScheduler()
val clock = new ManualClock()
val dataPool = new FetchedDataPool(scheduler, clock, conf)
val cacheKeys = (0 until 10).map { partId =>
CacheKey("testgroup", new TopicPartition("topic", partId))
}
val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)
dataList.map { case (_, data) =>
data.withNewPoll(testRecords(0, 5).listIterator, 5)
}
val dataToEvict = dataList.take(3)
// release key with around 500 ms delay, so that we can check eviction per key
dataToEvict.foreach { case (key, data) =>
dataPool.release(key, data)
clock.advance(500)
}
// time elapsed after releasing
// first key: 1500ms, second key: 1000 ms, third key: 500 ms
// advancing - first key: 2100ms, second key: 1600 ms, third key: 1100 ms
clock.advance(600)
scheduler.tick(minEvictableIdleTimeMillis + 100, TimeUnit.MILLISECONDS)
assert(getCache(dataPool)(dataToEvict(0)._1).isEmpty)
assert(getCache(dataPool)(dataToEvict(1)._1).nonEmpty)
assert(getCache(dataPool)(dataToEvict(2)._1).nonEmpty)
// advancing - second key: 2100 ms, third key: 1600 ms
clock.advance(500)
scheduler.tick(minEvictableIdleTimeMillis + 100, TimeUnit.MILLISECONDS)
assert(getCache(dataPool)(dataToEvict(0)._1).isEmpty)
assert(getCache(dataPool)(dataToEvict(1)._1).isEmpty)
assert(getCache(dataPool)(dataToEvict(2)._1).nonEmpty)
// advancing - third key: 2300 ms
clock.advance(500)
scheduler.tick(minEvictableIdleTimeMillis + 100, TimeUnit.MILLISECONDS)
assert(getCache(dataPool)(dataToEvict(0)._1).isEmpty)
assert(getCache(dataPool)(dataToEvict(1)._1).isEmpty)
assert(getCache(dataPool)(dataToEvict(2)._1).isEmpty)
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 7)
assert(getCache(dataPool).values.map(_.size).sum === dataList.size - dataToEvict.size)
dataList.takeRight(3).foreach { case (key, data) =>
dataPool.release(key, data)
}
// add objects to be candidates for eviction
clock.advance(minEvictableIdleTimeMillis + 100)
// ensure releasing more objects don't trigger eviction unless evictor runs
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 7)
assert(getCache(dataPool).values.map(_.size).sum === dataList.size - dataToEvict.size)
try {
dataPool.shutdown()
} catch {
// ignore as it's known issue, DeterministicScheduler doesn't support shutdown
case _: UnsupportedOperationException =>
}
}
test("invalidate key") {
val dataPool = new FetchedDataPool(new SparkConf())
val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
val dataFromTask1 = dataPool.acquire(cacheKey, 0)
val dataFromTask2 = dataPool.acquire(cacheKey, 0)
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 2, expectedNumTotal = 2)
// 1 idle, 1 active
dataPool.release(cacheKey, dataFromTask1)
val cacheKey2 = CacheKey("testgroup", new TopicPartition("topic", 1))
dataPool.acquire(cacheKey2, 0)
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 3, expectedNumTotal = 3)
assert(getCache(dataPool).size === 2)
assert(getCache(dataPool)(cacheKey).size === 2)
assert(getCache(dataPool)(cacheKey2).size === 1)
dataPool.invalidate(cacheKey)
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 3, expectedNumTotal = 1)
assert(getCache(dataPool).size === 1)
assert(getCache(dataPool).get(cacheKey).isEmpty)
// it doesn't affect other keys
assert(getCache(dataPool)(cacheKey2).size === 1)
dataPool.release(cacheKey, dataFromTask2)
// it doesn't throw error on invalidated objects, but it doesn't cache them again
assert(getCache(dataPool).size === 1)
assert(getCache(dataPool).get(cacheKey).isEmpty)
dataPool.shutdown()
}
private def testRecords(startOffset: Long, count: Int): ju.List[Record] = {
(0 until count).map { offset =>
new Record("topic", 0, startOffset + offset, dummyBytes, dummyBytes)
}.toList.asJava
}
private def assertFetchedDataPoolStatistic(
fetchedDataPool: FetchedDataPool,
expectedNumCreated: Long,
expectedNumTotal: Long): Unit = {
assert(fetchedDataPool.numCreated === expectedNumCreated)
assert(fetchedDataPool.numTotal === expectedNumTotal)
}
}

View file

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession
class InternalKafkaConsumerPoolSuite extends SharedSparkSession {
test("basic multiple borrows and returns for single key") {
val pool = new InternalKafkaConsumerPool(new SparkConf())
val topic = "topic"
val partitionId = 0
val topicPartition = new TopicPartition(topic, partitionId)
val kafkaParams: ju.Map[String, Object] = getTestKafkaParams
val key = new CacheKey(topicPartition, kafkaParams)
val pooledObjects = (0 to 2).map { _ =>
val pooledObject = pool.borrowObject(key, kafkaParams)
assertPooledObject(pooledObject, topicPartition, kafkaParams)
pooledObject
}
assertPoolStateForKey(pool, key, numIdle = 0, numActive = 3, numTotal = 3)
assertPoolState(pool, numIdle = 0, numActive = 3, numTotal = 3)
val pooledObject2 = pool.borrowObject(key, kafkaParams)
assertPooledObject(pooledObject2, topicPartition, kafkaParams)
assertPoolStateForKey(pool, key, numIdle = 0, numActive = 4, numTotal = 4)
assertPoolState(pool, numIdle = 0, numActive = 4, numTotal = 4)
pooledObjects.foreach(pool.returnObject)
assertPoolStateForKey(pool, key, numIdle = 3, numActive = 1, numTotal = 4)
assertPoolState(pool, numIdle = 3, numActive = 1, numTotal = 4)
pool.returnObject(pooledObject2)
// we only allow three idle objects per key
assertPoolStateForKey(pool, key, numIdle = 3, numActive = 0, numTotal = 3)
assertPoolState(pool, numIdle = 3, numActive = 0, numTotal = 3)
pool.close()
}
test("basic borrow and return for multiple keys") {
val pool = new InternalKafkaConsumerPool(new SparkConf())
val kafkaParams = getTestKafkaParams
val topicPartitions = createTopicPartitions(Seq("topic", "topic2"), 6)
val keys = createCacheKeys(topicPartitions, kafkaParams)
// while in loop pool doesn't still exceed total pool size
val keyToPooledObjectPairs = borrowObjectsPerKey(pool, kafkaParams, keys)
assertPoolState(pool, numIdle = 0, numActive = keyToPooledObjectPairs.length,
numTotal = keyToPooledObjectPairs.length)
returnObjects(pool, keyToPooledObjectPairs)
assertPoolState(pool, numIdle = keyToPooledObjectPairs.length, numActive = 0,
numTotal = keyToPooledObjectPairs.length)
pool.close()
}
test("borrow more than soft max capacity from pool which is neither free space nor idle object") {
testWithPoolBorrowedSoftMaxCapacity { (pool, kafkaParams, keyToPooledObjectPairs) =>
val moreTopicPartition = new TopicPartition("topic2", 0)
val newCacheKey = new CacheKey(moreTopicPartition, kafkaParams)
// exceeds soft max pool size, and also no idle object for cleaning up
// but pool will borrow a new object
pool.borrowObject(newCacheKey, kafkaParams)
assertPoolState(pool, numIdle = 0, numActive = keyToPooledObjectPairs.length + 1,
numTotal = keyToPooledObjectPairs.length + 1)
}
}
test("borrow more than soft max capacity from pool frees up idle objects automatically") {
testWithPoolBorrowedSoftMaxCapacity { (pool, kafkaParams, keyToPooledObjectPairs) =>
// return 20% of objects to ensure there're some idle objects to free up later
val numToReturn = (keyToPooledObjectPairs.length * 0.2).toInt
returnObjects(pool, keyToPooledObjectPairs.take(numToReturn))
assertPoolState(pool, numIdle = numToReturn,
numActive = keyToPooledObjectPairs.length - numToReturn,
numTotal = keyToPooledObjectPairs.length)
// borrow a new object: there should be some idle objects to clean up
val moreTopicPartition = new TopicPartition("topic2", 0)
val newCacheKey = new CacheKey(moreTopicPartition, kafkaParams)
val newObject = pool.borrowObject(newCacheKey, kafkaParams)
assertPooledObject(newObject, moreTopicPartition, kafkaParams)
assertPoolStateForKey(pool, newCacheKey, numIdle = 0, numActive = 1, numTotal = 1)
// at least one of idle object should be freed up
assert(pool.numIdle < numToReturn)
// we can determine number of active objects correctly
assert(pool.numActive === keyToPooledObjectPairs.length - numToReturn + 1)
// total objects should be more than number of active + 1 but can't expect exact number
assert(pool.size > keyToPooledObjectPairs.length - numToReturn + 1)
}
}
private def testWithPoolBorrowedSoftMaxCapacity(
testFn: (InternalKafkaConsumerPool,
ju.Map[String, Object],
Seq[(CacheKey, InternalKafkaConsumer)]) => Unit): Unit = {
val capacity = 16
val conf = new SparkConf()
conf.set(CONSUMER_CACHE_CAPACITY, capacity)
conf.set(CONSUMER_CACHE_TIMEOUT, -1L)
conf.set(CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, -1L)
val pool = new InternalKafkaConsumerPool(conf)
try {
val kafkaParams = getTestKafkaParams
val topicPartitions = createTopicPartitions(Seq("topic"), capacity)
val keys = createCacheKeys(topicPartitions, kafkaParams)
// borrow objects which makes pool reaching soft capacity
val keyToPooledObjectPairs = borrowObjectsPerKey(pool, kafkaParams, keys)
testFn(pool, kafkaParams, keyToPooledObjectPairs)
} finally {
pool.close()
}
}
test("evicting idle objects on background") {
import org.scalatest.time.SpanSugar._
val minEvictableIdleTimeMillis = 3 * 1000L // 3 seconds
val evictorThreadRunIntervalMillis = 500L // triggering multiple evictions by intention
val conf = new SparkConf()
conf.set(CONSUMER_CACHE_TIMEOUT, minEvictableIdleTimeMillis)
conf.set(CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, evictorThreadRunIntervalMillis)
val pool = new InternalKafkaConsumerPool(conf)
val kafkaParams = getTestKafkaParams
val topicPartitions = createTopicPartitions(Seq("topic"), 10)
val keys = createCacheKeys(topicPartitions, kafkaParams)
// borrow and return some consumers to ensure some partitions are being idle
// this test covers the use cases: rebalance / topic removal happens while running query
val keyToPooledObjectPairs = borrowObjectsPerKey(pool, kafkaParams, keys)
val objectsToReturn = keyToPooledObjectPairs.filter(_._1.topicPartition.partition() % 2 == 0)
returnObjects(pool, objectsToReturn)
// wait up to twice than minEvictableIdleTimeMillis to ensure evictor thread to clear up
// idle objects
eventually(timeout((minEvictableIdleTimeMillis.toLong * 2).seconds),
interval(evictorThreadRunIntervalMillis.milliseconds)) {
assertPoolState(pool, numIdle = 0, numActive = 5, numTotal = 5)
}
pool.close()
}
private def createTopicPartitions(
topicNames: Seq[String],
countPartition: Int): List[TopicPartition] = {
for (
topic <- topicNames.toList;
partitionId <- 0 until countPartition
) yield new TopicPartition(topic, partitionId)
}
private def createCacheKeys(
topicPartitions: List[TopicPartition],
kafkaParams: ju.Map[String, Object]): List[CacheKey] = {
topicPartitions.map(new CacheKey(_, kafkaParams))
}
private def assertPooledObject(
pooledObject: InternalKafkaConsumer,
expectedTopicPartition: TopicPartition,
expectedKafkaParams: ju.Map[String, Object]): Unit = {
assert(pooledObject != null)
assert(pooledObject.kafkaParams === expectedKafkaParams)
assert(pooledObject.topicPartition === expectedTopicPartition)
}
private def assertPoolState(
pool: InternalKafkaConsumerPool,
numIdle: Int,
numActive: Int,
numTotal: Int): Unit = {
assert(pool.numIdle === numIdle)
assert(pool.numActive === numActive)
assert(pool.size === numTotal)
}
private def assertPoolStateForKey(
pool: InternalKafkaConsumerPool,
key: CacheKey,
numIdle: Int,
numActive: Int,
numTotal: Int): Unit = {
assert(pool.numIdle(key) === numIdle)
assert(pool.numActive(key) === numActive)
assert(pool.size(key) === numTotal)
}
private def getTestKafkaParams: ju.Map[String, Object] = Map[String, Object](
GROUP_ID_CONFIG -> "groupId",
BOOTSTRAP_SERVERS_CONFIG -> "PLAINTEXT://localhost:9092",
KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName,
VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName,
AUTO_OFFSET_RESET_CONFIG -> "earliest",
ENABLE_AUTO_COMMIT_CONFIG -> "false"
).asJava
private def borrowObjectsPerKey(
pool: InternalKafkaConsumerPool,
kafkaParams: ju.Map[String, Object],
keys: List[CacheKey]): Seq[(CacheKey, InternalKafkaConsumer)] = {
keys.map { key =>
val numActiveBeforeBorrowing = pool.numActive
val numIdleBeforeBorrowing = pool.numIdle
val numTotalBeforeBorrowing = pool.size
val pooledObj = pool.borrowObject(key, kafkaParams)
assertPoolStateForKey(pool, key, numIdle = 0, numActive = 1, numTotal = 1)
assertPoolState(pool, numIdle = numIdleBeforeBorrowing,
numActive = numActiveBeforeBorrowing + 1, numTotal = numTotalBeforeBorrowing + 1)
(key, pooledObj)
}
}
private def returnObjects(
pool: InternalKafkaConsumerPool,
objects: Seq[(CacheKey, InternalKafkaConsumer)]): Unit = {
objects.foreach { case (key, pooledObj) =>
val numActiveBeforeReturning = pool.numActive
val numIdleBeforeReturning = pool.numIdle
val numTotalBeforeReturning = pool.size
pool.returnObject(pooledObj)
// we only allow one idle object per key
assertPoolStateForKey(pool, key, numIdle = 1, numActive = 0, numTotal = 1)
assertPoolState(pool, numIdle = numIdleBeforeReturning + 1,
numActive = numActiveBeforeReturning - 1, numTotal = numTotalBeforeReturning)
}
}
}

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010
import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.Random
import org.apache.kafka.clients.consumer.ConsumerConfig._
@ -60,49 +61,85 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester
AUTO_OFFSET_RESET_CONFIG -> "earliest",
ENABLE_AUTO_COMMIT_CONFIG -> "false"
).asJava
private var fetchedDataPool: FetchedDataPool = _
private var consumerPool: InternalKafkaConsumerPool = _
override def beforeEach(): Unit = {
fetchedDataPool = {
val fetchedDataPoolMethod = PrivateMethod[FetchedDataPool]('fetchedDataPool)
KafkaDataConsumer.invokePrivate(fetchedDataPoolMethod())
}
consumerPool = {
val internalKafkaConsumerPoolMethod = PrivateMethod[InternalKafkaConsumerPool]('consumerPool)
KafkaDataConsumer.invokePrivate(internalKafkaConsumerPoolMethod())
}
fetchedDataPool.reset()
consumerPool.reset()
}
test("SPARK-19886: Report error cause correctly in reportDataLoss") {
val cause = new Exception("D'oh!")
val reportDataLoss = PrivateMethod[Unit]('reportDataLoss0)
val e = intercept[IllegalStateException] {
InternalKafkaConsumer.invokePrivate(reportDataLoss(true, "message", cause))
KafkaDataConsumer.invokePrivate(reportDataLoss(true, "message", cause))
}
assert(e.getCause === cause)
}
test("new KafkaDataConsumer instance in case of Task retry") {
try {
KafkaDataConsumer.cache.clear()
val kafkaParams = getKafkaParams()
val key = new CacheKey(groupId, topicPartition)
val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
TaskContext.setTaskContext(context1)
val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams, true)
val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
// any method call which requires consumer is necessary
consumer1.getAvailableOffsetRange()
val consumer1Underlying = consumer1._consumer
assert(consumer1Underlying.isDefined)
consumer1.release()
assert(KafkaDataConsumer.cache.size() == 1)
assert(KafkaDataConsumer.cache.get(key).eq(consumer1.internalConsumer))
assert(consumerPool.size(key) === 1)
// check whether acquired object is available in pool
val pooledObj = consumerPool.borrowObject(key, kafkaParams)
assert(consumer1Underlying.get.eq(pooledObj))
consumerPool.returnObject(pooledObj)
val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
TaskContext.setTaskContext(context2)
val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams, true)
val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
// any method call which requires consumer is necessary
consumer2.getAvailableOffsetRange()
val consumer2Underlying = consumer2._consumer
assert(consumer2Underlying.isDefined)
// here we expect different consumer as pool will invalidate for task reattempt
assert(consumer2Underlying.get.ne(consumer1Underlying.get))
consumer2.release()
// The first consumer should be removed from cache and new non-cached should be returned
assert(KafkaDataConsumer.cache.size() == 0)
assert(consumer1.internalConsumer.ne(consumer2.internalConsumer))
// The first consumer should be removed from cache, but the consumer after invalidate
// should be cached.
assert(consumerPool.size(key) === 1)
val pooledObj2 = consumerPool.borrowObject(key, kafkaParams)
assert(consumer2Underlying.get.eq(pooledObj2))
consumerPool.returnObject(pooledObj2)
} finally {
TaskContext.unset()
}
}
test("SPARK-23623: concurrent use of KafkaDataConsumer") {
val data = (1 to 1000).map(_.toString)
testUtils.createTopic(topic, 1)
testUtils.sendMessages(topic, data.toArray)
val data: immutable.IndexedSeq[String] = prepareTestTopicHavingTestMessages(topic)
val topicPartition = new TopicPartition(topic, 0)
val kafkaParams = getKafkaParams()
val numThreads = 100
val numConsumerUsages = 500
@ -110,14 +147,13 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester
@volatile var error: Throwable = null
def consume(i: Int): Unit = {
val useCache = Random.nextBoolean
val taskContext = if (Random.nextBoolean) {
new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null)
} else {
null
}
TaskContext.setTaskContext(taskContext)
val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache)
val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
try {
val range = consumer.getAvailableOffsetRange()
val rcvd = range.earliest until range.latest map { offset =>
@ -147,4 +183,143 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester
threadpool.shutdown()
}
}
test("SPARK-25151 Handles multiple tasks in executor fetching same (topic, partition) pair") {
prepareTestTopicHavingTestMessages(topic)
val topicPartition = new TopicPartition(topic, 0)
val kafkaParams = getKafkaParams()
withTaskContext(TaskContext.empty()) {
// task A trying to fetch offset 0 to 100, and read 5 records
val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
val lastOffsetForConsumer1 = readAndGetLastOffset(consumer1, 0, 100, 5)
consumer1.release()
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 1, expectedNumTotal = 1)
// task B trying to fetch offset 300 to 500, and read 5 records
val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
val lastOffsetForConsumer2 = readAndGetLastOffset(consumer2, 300, 500, 5)
consumer2.release()
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 2, expectedNumTotal = 2)
// task A continue reading from the last offset + 1, with upper bound 100 again
val consumer1a = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
consumer1a.get(lastOffsetForConsumer1 + 1, 100, 10000, failOnDataLoss = false)
consumer1a.release()
// pool should succeed to provide cached data instead of creating one
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 2, expectedNumTotal = 2)
// task B also continue reading from the last offset + 1, with upper bound 500 again
val consumer2a = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
consumer2a.get(lastOffsetForConsumer2 + 1, 500, 10000, failOnDataLoss = false)
consumer2a.release()
// same expectation: pool should succeed to provide cached data instead of creating one
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 2, expectedNumTotal = 2)
}
}
test("SPARK-25151 Handles multiple tasks in executor fetching same (topic, partition) pair " +
"and same offset (edge-case) - data in use") {
prepareTestTopicHavingTestMessages(topic)
val topicPartition = new TopicPartition(topic, 0)
val kafkaParams = getKafkaParams()
withTaskContext(TaskContext.empty()) {
// task A trying to fetch offset 0 to 100, and read 5 records (still reading)
val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
val lastOffsetForConsumer1 = readAndGetLastOffset(consumer1, 0, 100, 5)
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 1, expectedNumTotal = 1)
// task B trying to fetch offset the last offset task A is reading so far + 1 to 500
// this is a condition for edge case
val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
consumer2.get(lastOffsetForConsumer1 + 1, 100, 10000, failOnDataLoss = false)
// Pool must create a new fetched data instead of returning existing on now in use even
// there's fetched data matching start offset.
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 2, expectedNumTotal = 2)
consumer1.release()
consumer2.release()
}
}
test("SPARK-25151 Handles multiple tasks in executor fetching same (topic, partition) pair " +
"and same offset (edge-case) - data not in use") {
prepareTestTopicHavingTestMessages(topic)
val topicPartition = new TopicPartition(topic, 0)
val kafkaParams = getKafkaParams()
withTaskContext(TaskContext.empty()) {
// task A trying to fetch offset 0 to 100, and read 5 records (still reading)
val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
val lastOffsetForConsumer1 = readAndGetLastOffset(consumer1, 0, 100, 5)
consumer1.release()
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 1, expectedNumTotal = 1)
// task B trying to fetch offset the last offset task A is reading so far + 1 to 500
// this is a condition for edge case
val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams)
consumer2.get(lastOffsetForConsumer1 + 1, 100, 10000, failOnDataLoss = false)
// Pool cannot determine the origin task, so it has to just provide matching one.
// task A may come back and try to fetch, and cannot find previous data
// (or the data is in use).
// If then task A may have to fetch from Kafka, but we already avoided fetching from Kafka in
// task B, so it is not a big deal in overall.
assertFetchedDataPoolStatistic(fetchedDataPool, expectedNumCreated = 1, expectedNumTotal = 1)
consumer2.release()
}
}
private def assertFetchedDataPoolStatistic(
fetchedDataPool: FetchedDataPool,
expectedNumCreated: Long,
expectedNumTotal: Long): Unit = {
assert(fetchedDataPool.numCreated === expectedNumCreated)
assert(fetchedDataPool.numTotal === expectedNumTotal)
}
private def readAndGetLastOffset(
consumer: KafkaDataConsumer,
startOffset: Long,
untilOffset: Long,
numToRead: Int): Long = {
var lastOffset: Long = startOffset - 1
(0 until numToRead).foreach { _ =>
val record = consumer.get(lastOffset + 1, untilOffset, 10000, failOnDataLoss = false)
// validation for fetched record is covered by other tests, so skip on validating
lastOffset = record.offset()
}
lastOffset
}
private def prepareTestTopicHavingTestMessages(topic: String) = {
val data = (1 to 1000).map(_.toString)
testUtils.createTopic(topic, 1)
testUtils.sendMessages(topic, data.toArray)
data
}
private def withTaskContext(context: TaskContext)(task: => Unit): Unit = {
try {
TaskContext.setTaskContext(context)
task
} finally {
TaskContext.unset()
}
}
}

View file

@ -1146,7 +1146,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
KafkaSourceOffset(Map(tp -> 100L))).map(_.asInstanceOf[KafkaBatchInputPartition])
withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") {
assert(inputPartitions.size == numPartitionsGenerated)
inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
}
}
}

View file

@ -180,6 +180,8 @@
<commons-lang2.version>2.6</commons-lang2.version>
<!-- org.apache.commons/commons-lang3/-->
<commons-lang3.version>3.8.1</commons-lang3.version>
<!-- org.apache.commons/commons-pool2/-->
<commons-pool2.version>2.6.2</commons-pool2.version>
<datanucleus-core.version>3.2.10</datanucleus-core.version>
<janino.version>3.0.15</janino.version>
<jersey.version>2.29</jersey.version>