[SPARK-35611][SS] Introduce the strategy on mismatched offset for start offset timestamp on Kafka data source

### What changes were proposed in this pull request?

This PR proposes to introduce the strategy on mismatched offset for start offset timestamp on Kafka data source.

Please read the section `Why are the changes needed?` to understand the rationalization of the functionality.

This would be pretty much helpful for the case where there's a skew between partitions and some partitions have older records.

* AS-IS: Spark simply fails the query and end users have to deal with workarounds requiring manual steps.
* TO-BE: Spark will assign the latest offset for these partitions, so that Spark can read newer records from these partitions in further micro-batches.

To retain the existing behavior and also give some help for the proposed "TO-BE" behavior, we'd like to introduce the strategy on mismatched offset for start offset timestamp to let end users choose from them.

The strategy will be added as source option, to ensure end users set the behavior explicitly (otherwise simply "known" default value).

* New source option to be added: startingOffsetsByTimestampStrategy
* Available values: `error` (fail the query as referred as AS-IS), `latest` (set the offset to the latest as referred as TO-BE)

Doc changes are following:

![ES-106042-doc-screenshot-1](https://user-images.githubusercontent.com/1317309/120472697-2c1ba800-c3e1-11eb-884f-f28152168053.png)
![ES-106042-doc-screenshot-2](https://user-images.githubusercontent.com/1317309/120472719-33db4c80-c3e1-11eb-9851-939be8a3ddb7.png)

### Why are the changes needed?

We encountered a real-world case Spark fails the query if some of the partitions don't have matching offset by timestamp.

This is intended behavior to avoid bring unintended output for some cases like:

* timestamp 2 is presented as timestamp-offset, but the some of partitions don't have the record yet
* record with timestamp 1 comes "later" in the following micro-batch

which is possible since Kafka allows to specify the timestamp in record.

Here the unintended output we talked about was the risk of reading record with timestamp 1 in the next micro-batch despite the option specifying timestamp 2.

But for many cases end users just suppose timestamp is increasing monotonically with wall clocks are all in sync, and current behavior blocks these cases to make progress.

### Does this PR introduce _any_ user-facing change?

Yes, but not a breaking change. It's up to end users to choose the behavior which the default value is "error" (current behavior). And it's a source option (not config) so they need to explicitly set the behavior to let the functionality takes effect.

### How was this patch tested?

New UTs.

Closes #32747 from HeartSaVioR/SPARK-35611.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Jungtaek Lim 2021-06-21 00:37:42 -07:00 committed by Liang-Chi Hsieh
parent 974d127c4f
commit 4a6d90e187
10 changed files with 323 additions and 112 deletions

View file

@ -369,7 +369,7 @@ The following configurations are optional:
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a string specifying a starting timestamp for
all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code><p/>
<p/>
Note1: <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and <code>startingOffsets</code>.<p/>
Note2: For streaming queries, this only applies when a new query is started, and that resuming will
@ -385,7 +385,7 @@ The following configurations are optional:
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code><p/>
<p/>
Note1: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
Note2: For streaming queries, this only applies when a new query is started, and that resuming will
@ -539,6 +539,16 @@ The following configurations are optional:
<td>streaming and batch</td>
<td>Whether to include the Kafka headers in the row.</td>
</tr>
<tr>
<td>startingOffsetsByTimestampStrategy</td>
<td>"error" or "latest"</td>
<td>"error"</td>
<td>streaming and batch</td>
<td>Defines the behavior when the starting offset by timestamp is specified (either global or per partition), and Kafka doesn't return the matched offset.<p/>
<p/>
"error": fail the query.<p/>
"latest": set the offset to the latest, so that further new records in the partition are being read.<p/></td>
</tr>
</table>
### Details on timestamp offset options

View file

@ -70,10 +70,10 @@ class KafkaContinuousStream(
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) => offsetReader.fetchSpecificTimestampBasedOffsets(p,
failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) => offsetReader.fetchGlobalTimestampBasedOffsets(
ts, failsOnNoMatchingOffset = true)
case SpecificTimestampRangeLimit(p, strategy) =>
offsetReader.fetchSpecificTimestampBasedOffsets(p, isStartingOffsets = true, strategy)
case GlobalTimestampRangeLimit(ts, strategy) =>
offsetReader.fetchGlobalTimestampBasedOffsets(ts, isStartingOffsets = true, strategy)
}
logInfo(s"Initial offsets: $offsets")
offsets

View file

@ -233,10 +233,12 @@ private[kafka010] class KafkaMicroBatchStream(
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) =>
kafkaOffsetReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true)
case SpecificTimestampRangeLimit(p, strategy) =>
kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p,
isStartingOffsets = true, strategy)
case GlobalTimestampRangeLimit(ts, strategy) =>
kafkaOffsetReader.fetchGlobalTimestampBasedOffsets(ts,
isStartingOffsets = true, strategy)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")

View file

@ -19,6 +19,8 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
/**
* Objects that represent desired offset range limits for starting,
* ending, and specific offsets.
@ -47,14 +49,18 @@ private[kafka010] case class SpecificOffsetRangeLimit(
* greater than specific timestamp.
*/
private[kafka010] case class SpecificTimestampRangeLimit(
topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
topicTimestamps: Map[TopicPartition, Long],
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value)
extends KafkaOffsetRangeLimit
/**
* Represents the desire to bind to earliest offset which timestamp for the offset is equal or
* greater than specific timestamp. This applies the timestamp to the all topics/partitions.
*/
private[kafka010] case class GlobalTimestampRangeLimit(
timestamp: Long) extends KafkaOffsetRangeLimit
timestamp: Long,
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value)
extends KafkaOffsetRangeLimit
private[kafka010] object KafkaOffsetRangeLimit {
/**

View file

@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
/**
* Base trait to fetch offsets from Kafka. The implementations are
@ -66,30 +67,40 @@ private[kafka010] trait KafkaOffsetReader {
/**
* Resolves the specific offsets based on timestamp per topic-partition.
* The returned offset for each partition is the earliest offset whose timestamp is greater
* than or equal to the given timestamp in the corresponding partition. If the matched offset
* doesn't exist, depending on `failsOnNoMatchingOffset` parameter, the offset will be set to
* latest or this method throws an error.
* than or equal to the given timestamp in the corresponding partition.
*
* If the matched offset doesn't exist, the behavior depends on the destination and the option:
*
* - isStartingOffsets = false => implementation should provide the offset same as 'latest'
* - isStartingOffsets = true => implementation should follow the strategy on non-matching
* starting offset, passed as `strategyOnNoMatchStartingOffset`
*
* @param partitionTimestamps the timestamp per topic-partition.
* @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found.
*/
def fetchSpecificTimestampBasedOffsets(
partitionTimestamps: Map[TopicPartition, Long],
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value)
: KafkaSourceOffset
/**
* Resolves the specific offsets based on timestamp per all topic-partitions being subscribed.
* The returned offset for each partition is the earliest offset whose timestamp is greater
* than or equal to the given timestamp in the corresponding partition. If the matched offset
* doesn't exist, depending on `failsOnNoMatchingOffset` parameter, the offset will be set to
* latest or this method throws an error.
* than or equal to the given timestamp in the corresponding partition.
*
* If the matched offset doesn't exist, the behavior depends on the destination and the option:
*
* - isStartingOffsets = false => implementation should provide the offset same as 'latest'
* - isStartingOffsets = true => implementation should follow the strategy on non-matching
* starting offset, passed as `strategyOnNoMatchStartingOffset`
*
* @param timestamp the timestamp.
* @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found.
*/
def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset
isStartingOffsets: Boolean,
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value)
: KafkaSourceOffset
/**
* Fetch the earliest offsets for the topic partitions that are indicated

View file

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.kafka.clients.admin.{Admin, ListOffsetsOptions, OffsetSpec}
import org.apache.kafka.clients.admin.{Admin, ListOffsetsOptions, ListOffsetsResult, OffsetSpec}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.requests.OffsetFetchResponse
@ -33,6 +33,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
/**
* This class uses Kafka's own [[Admin]] API to read data offsets from Kafka.
@ -135,12 +136,12 @@ private[kafka010] class KafkaOffsetReaderAdmin(
}.toMap
case SpecificOffsetRangeLimit(partitionOffsets) =>
validateTopicPartitions(partitions, partitionOffsets)
case SpecificTimestampRangeLimit(partitionTimestamps) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp) =>
fetchGlobalTimestampBasedOffsets(timestamp,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case SpecificTimestampRangeLimit(partitionTimestamps, strategyOnNoMatchingStartingOffset) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps, isStartingOffsets,
strategyOnNoMatchingStartingOffset).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp, strategyOnNoMatchingStartingOffset) =>
fetchGlobalTimestampBasedOffsets(timestamp, isStartingOffsets,
strategyOnNoMatchingStartingOffset).partitionToOffsets
}
}
@ -164,7 +165,10 @@ private[kafka010] class KafkaOffsetReaderAdmin(
override def fetchSpecificTimestampBasedOffsets(
partitionTimestamps: Map[TopicPartition, Long],
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value)
: KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
assert(partitions.asScala == partitionTimestamps.keySet,
"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
@ -172,34 +176,27 @@ private[kafka010] class KafkaOffsetReaderAdmin(
logDebug(s"Assigned partitions: $partitions. Seeking to $partitionTimestamps")
}
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
val listOffsetsParams = partitionTimestamps.map { case (tp, timestamp) =>
tp -> OffsetSpec.forTimestamp(timestamp)
}.asJava
admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.map {
case (tp, offsetSpec) =>
if (failsOnNoMatchingOffset) {
assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
s"matched from request of topic-partition $tp and timestamp " +
s"${partitionTimestamps(tp)}.")
}
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ =>
val listOffsetsParams = partitionTimestamps.map { case (tp, timestamp) =>
tp -> OffsetSpec.forTimestamp(timestamp)
}.asJava
if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetSpec.offset()
}
}.toMap
}
readTimestampOffsets(
admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.toMap,
isStartingOffsets,
strategyOnNoMatchStartingOffset,
partitionTimestamps)
}
fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
}
override def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value)
: KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp")
}
@ -208,25 +205,47 @@ private[kafka010] class KafkaOffsetReaderAdmin(
val listOffsetsParams = tps.asScala.map { tp =>
tp -> OffsetSpec.forTimestamp(timestamp)
}.toMap.asJava
admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.map {
case (tp, offsetSpec) =>
if (failsOnNoMatchingOffset) {
assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
s"matched from request of topic-partition $tp and timestamp " +
s"$timestamp.")
}
if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetSpec.offset()
}
}.toMap
readTimestampOffsets(
admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.toMap,
isStartingOffsets,
strategyOnNoMatchStartingOffset,
_ => timestamp
)
}
fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
}
private def readTimestampOffsets(
tpToOffsetMap: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo],
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value,
partitionTimestampFn: TopicPartition => Long): Map[TopicPartition, Long] = {
tpToOffsetMap.map { case (tp, offsetSpec) =>
val offset = if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
if (isStartingOffsets) {
strategyOnNoMatchStartingOffset match {
case StrategyOnNoMatchStartingOffset.ERROR =>
throw new IllegalArgumentException("No offset " +
s"matched from request of topic-partition $tp and timestamp " +
s"${partitionTimestampFn(tp)}.")
case StrategyOnNoMatchStartingOffset.LATEST =>
KafkaOffsetRangeLimit.LATEST
}
} else {
KafkaOffsetRangeLimit.LATEST
}
} else {
offsetSpec.offset()
}
tp -> offset
}.toMap
}
private def fetchSpecificOffsets0(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long]

View file

@ -30,6 +30,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner}
/**
@ -157,12 +158,12 @@ private[kafka010] class KafkaOffsetReaderConsumer(
}.toMap
case SpecificOffsetRangeLimit(partitionOffsets) =>
validateTopicPartitions(partitions, partitionOffsets)
case SpecificTimestampRangeLimit(partitionTimestamps) =>
case SpecificTimestampRangeLimit(partitionTimestamps, strategy) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp) =>
isStartingOffsets, strategy).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp, strategy) =>
fetchGlobalTimestampBasedOffsets(timestamp,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
isStartingOffsets, strategy).partitionToOffsets
}
}
@ -200,7 +201,10 @@ private[kafka010] class KafkaOffsetReaderConsumer(
override def fetchSpecificTimestampBasedOffsets(
partitionTimestamps: Map[TopicPartition, Long],
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value)
: KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
assert(partitions.asScala == partitionTimestamps.keySet,
"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
@ -208,27 +212,19 @@ private[kafka010] class KafkaOffsetReaderConsumer(
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps")
}
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
val converted = partitionTimestamps.map { case (tp, timestamp) =>
tp -> java.lang.Long.valueOf(timestamp)
}.asJava
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ =>
val converted = partitionTimestamps.map { case (tp, timestamp) =>
tp -> java.lang.Long.valueOf(timestamp)
}.asJava
val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
consumer.offsetsForTimes(converted)
val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
consumer.offsetsForTimes(converted)
offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
if (failsOnNoMatchingOffset) {
assert(offsetAndTimestamp != null, "No offset matched from request of " +
s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.")
}
if (offsetAndTimestamp == null) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetAndTimestamp.offset()
}
}.toMap
}
readTimestampOffsets(
offsetForTime.asScala.toMap,
isStartingOffsets,
strategyOnNoMatchStartingOffset,
partitionTimestamps)
}
val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => }
@ -239,7 +235,10 @@ private[kafka010] class KafkaOffsetReaderConsumer(
override def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value)
: KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp")
}
@ -250,18 +249,11 @@ private[kafka010] class KafkaOffsetReaderConsumer(
val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
consumer.offsetsForTimes(converted)
offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
if (failsOnNoMatchingOffset) {
assert(offsetAndTimestamp != null, "No offset matched from request of " +
s"topic-partition $tp and timestamp $timestamp.")
}
if (offsetAndTimestamp == null) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetAndTimestamp.offset()
}
}.toMap
readTimestampOffsets(
offsetForTime.asScala.toMap,
isStartingOffsets,
strategyOnNoMatchStartingOffset,
_ => timestamp)
}
val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => }
@ -270,6 +262,38 @@ private[kafka010] class KafkaOffsetReaderConsumer(
fnAssertFetchedOffsets)
}
private def readTimestampOffsets(
tpToOffsetMap: Map[TopicPartition, OffsetAndTimestamp],
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value,
partitionTimestampFn: TopicPartition => Long): Map[TopicPartition, Long] = {
tpToOffsetMap.map { case (tp, offsetSpec) =>
val offset = if (offsetSpec == null) {
if (isStartingOffsets) {
strategyOnNoMatchStartingOffset match {
case StrategyOnNoMatchStartingOffset.ERROR =>
// This is to match the old behavior - we used assert to check the condition.
// scalastyle:off throwerror
throw new AssertionError("No offset " +
s"matched from request of topic-partition $tp and timestamp " +
s"${partitionTimestampFn(tp)}.")
// scalastyle:on throwerror
case StrategyOnNoMatchStartingOffset.LATEST =>
KafkaOffsetRangeLimit.LATEST
}
} else {
KafkaOffsetRangeLimit.LATEST
}
} else {
offsetSpec.offset()
}
tp -> offset
}.toMap
}
private def fetchSpecificOffsets0(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long],

View file

@ -111,10 +111,10 @@ private[kafka010] class KafkaSource(
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) =>
kafkaReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true)
case SpecificTimestampRangeLimit(p, strategy) =>
kafkaReader.fetchSpecificTimestampBasedOffsets(p, isStartingOffsets = true, strategy)
case GlobalTimestampRangeLimit(ts, strategy) =>
kafkaReader.fetchGlobalTimestampBasedOffsets(ts, isStartingOffsets = true, strategy)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")

View file

@ -545,9 +545,15 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private[kafka010] val FETCH_OFFSET_NUM_RETRY = "fetchoffset.numretries"
private[kafka010] val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchoffset.retryintervalms"
private[kafka010] val CONSUMER_POLL_TIMEOUT = "kafkaconsumer.polltimeoutms"
private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_STRATEGY_KEY =
"startingoffsetsbytimestampstrategy"
private val GROUP_ID_PREFIX = "groupidprefix"
private[kafka010] val INCLUDE_HEADERS = "includeheaders"
private[kafka010] object StrategyOnNoMatchStartingOffset extends Enumeration {
val ERROR, LATEST = Value
}
val TOPIC_OPTION_KEY = "topic"
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
@ -588,12 +594,16 @@ private[kafka010] object KafkaSourceProvider extends Logging {
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = {
// The order below represents "preferences"
val strategyOnNoMatchStartingOffset = params.get(STARTING_OFFSETS_BY_TIMESTAMP_STRATEGY_KEY)
.map(v => StrategyOnNoMatchStartingOffset.withName(v.toUpperCase(Locale.ROOT)))
.getOrElse(StrategyOnNoMatchStartingOffset.ERROR)
if (params.contains(globalOffsetTimestampOptionKey)) {
// 1. global timestamp
val tsStr = params(globalOffsetTimestampOptionKey).trim
try {
val ts = tsStr.toLong
GlobalTimestampRangeLimit(ts)
GlobalTimestampRangeLimit(ts, strategyOnNoMatchStartingOffset)
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Expected a single long value, got $tsStr")
@ -601,7 +611,8 @@ private[kafka010] object KafkaSourceProvider extends Logging {
} else if (params.contains(offsetByTimestampOptionKey)) {
// 2. timestamp per topic partition
val json = params(offsetByTimestampOptionKey).trim
SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json))
SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json),
strategyOnNoMatchStartingOffset)
} else {
// 3. latest/earliest/offset
params.get(offsetOptionKey).map(_.trim) match {

View file

@ -1635,8 +1635,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
testFromSpecificOffsets(
topic,
failOnDataLoss = failOnDataLoss,
"assign" -> assignString(topic, 0 to 4),
"failOnDataLoss" -> failOnDataLoss.toString)
"assign" -> assignString(topic, 0 to 4))
}
test(s"assign from specific timestamps (failOnDataLoss: $failOnDataLoss)") {
@ -1645,8 +1644,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
topic,
failOnDataLoss = failOnDataLoss,
addPartitions = false,
"assign" -> assignString(topic, 0 to 4),
"failOnDataLoss" -> failOnDataLoss.toString)
"assign" -> assignString(topic, 0 to 4))
}
test(s"assign from global timestamp per topic (failOnDataLoss: $failOnDataLoss)") {
@ -1655,8 +1653,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
topic,
failOnDataLoss = failOnDataLoss,
addPartitions = false,
"assign" -> assignString(topic, 0 to 4),
"failOnDataLoss" -> failOnDataLoss.toString)
"assign" -> assignString(topic, 0 to 4))
}
test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
@ -1747,6 +1744,137 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
}
}
test("subscribing topic by name from specific timestamps with non-matching starting offset") {
val topic = newTopic()
testFromSpecificTimestampsWithNoMatchingStartingOffset(topic, "subscribe" -> topic)
}
test("subscribing topic by name from global timestamp per topic with " +
"non-matching starting offset") {
val topic = newTopic()
testFromGlobalTimestampWithNoMatchingStartingOffset(topic, "subscribe" -> topic)
}
test("subscribing topic by pattern from specific timestamps with " +
"non-matching starting offset") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
testFromSpecificTimestampsWithNoMatchingStartingOffset(topic,
"subscribePattern" -> s"$topicPrefix-.*")
}
test("subscribing topic by pattern from global timestamp per topic with " +
"non-matching starting offset") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
testFromGlobalTimestampWithNoMatchingStartingOffset(topic,
"subscribePattern" -> s"$topicPrefix-.*")
}
private def testFromSpecificTimestampsWithNoMatchingStartingOffset(
topic: String,
options: (String, String)*): Unit = {
testUtils.createTopic(topic, partitions = 5)
val firstTimestamp = System.currentTimeMillis() - 5000
val secondTimestamp = firstTimestamp + 1000
setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
// no data after second timestamp for partition 4
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
// here we starts from second timestamp for all partitions, whereas we know there's
// no data in partition 4 matching second timestamp
val startPartitionTimestamps: Map[TopicPartition, Long] =
(0 to 4).map(new TopicPartition(topic, _) -> secondTimestamp).toMap
val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps)
val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss = true,
options: _*)
assertQueryFailOnStartOffsetStrategyAsError(mapped)
val mapped2 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss = true,
options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
assertQueryFailOnStartOffsetStrategyAsError(mapped2)
val mapped3 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss = true,
options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
testStream(mapped3)(
makeSureGetOffsetCalled,
Execute { q =>
val partitions = (0 to 4).map(new TopicPartition(topic, _))
// wait to reach the last offset in every partition
q.awaitOffset(
0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), streamingTimeout.toMillis)
},
CheckAnswer(-21, -22, -11, -12, 2, 12),
Execute { q =>
sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4, secondTimestamp)
// wait to reach the new last offset in every partition
val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp => tp -> 3L) ++
Seq(new TopicPartition(topic, 4) -> 6L)
q.awaitOffset(
0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
},
CheckNewAnswer(23, 24, 25)
)
}
private def testFromGlobalTimestampWithNoMatchingStartingOffset(
topic: String,
options: (String, String)*): Unit = {
testUtils.createTopic(topic, partitions = 5)
val firstTimestamp = System.currentTimeMillis() - 5000
val secondTimestamp = firstTimestamp + 1000
setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
// here we starts from second timestamp for all partitions, whereas we know there's
// no data in partition 4 matching second timestamp
val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, failOnDataLoss = true,
options: _*)
assertQueryFailOnStartOffsetStrategyAsError(mapped)
val mapped2 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, failOnDataLoss = true,
options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
assertQueryFailOnStartOffsetStrategyAsError(mapped2)
val mapped3 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, failOnDataLoss = true,
options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
testStream(mapped3)(
makeSureGetOffsetCalled,
Execute { q =>
val partitions = (0 to 4).map(new TopicPartition(topic, _))
// wait to reach the last offset in every partition
q.awaitOffset(
0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), streamingTimeout.toMillis)
},
CheckAnswer(-21, -22, -11, -12, 2, 12),
Execute { q =>
sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4, secondTimestamp)
// wait to reach the new last offset in every partition
val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp => tp -> 3L) ++
Seq(new TopicPartition(topic, 4) -> 6L)
q.awaitOffset(
0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
},
CheckNewAnswer(23, 24, 25)
)
}
private def assertQueryFailOnStartOffsetStrategyAsError(df: Dataset[_]): Unit = {
// In continuous mode, the origin exception is not caught here unfortunately, so we have to
// stick with checking general exception instead of verifying IllegalArgumentException.
intercept[Exception] {
testStream(df)(makeSureGetOffsetCalled)
}
}
test("bad source options") {
def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
val ex = intercept[IllegalArgumentException] {