[SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source

### What changes were proposed in this pull request?

This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.

This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.

New options introduced in this PR:

* startingTimestamp
* endingTimestamp

All two options receive timestamp as string.

There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:

* starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
* ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets

### Why are the changes needed?

Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.

Also, the number of partitions can also change, which requires either:

* fixing the code if the json is statically created
* introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically

Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink [provides the option](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration) to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.

With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.

### Does this PR introduce _any_ user-facing change?

Yes, this PR introduces two new options, described in above section.

Doc changes are following:

![스크린샷 2021-05-21 오후 12 01 02](https://user-images.githubusercontent.com/1317309/119076244-3034e680-ba2d-11eb-8323-0e227932d2e5.png)
![스크린샷 2021-05-21 오후 12 01 12](https://user-images.githubusercontent.com/1317309/119076255-35923100-ba2d-11eb-9d79-538a7f9ee738.png)
![스크린샷 2021-05-21 오후 12 01 24](https://user-images.githubusercontent.com/1317309/119076264-39be4e80-ba2d-11eb-8265-ac158f55c360.png)
![스크린샷 2021-05-21 오후 12 06 01](https://user-images.githubusercontent.com/1317309/119076271-3d51d580-ba2d-11eb-98ea-35fd72b1bbfc.png)

### How was this patch tested?

New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.

Closes #32609 from HeartSaVioR/SPARK-29223-v2.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Jungtaek Lim 2021-05-25 21:43:49 +09:00
parent 58d4da1bdb
commit a57afd442c
11 changed files with 394 additions and 79 deletions

View file

@ -362,24 +362,33 @@ The following configurations are optional:
<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
<tr>
<td>startingTimestamp</td>
<td>timestamp string e.g. "1000"</td>
<td>none (next preference is <code>startingOffsetsByTimestamp</code>)</td>
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a string specifying a starting timestamp for
all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
<p/>
Note1: <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and <code>startingOffsets</code>.<p/>
Note2: For streaming queries, this only applies when a new query is started, and that resuming will
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>startingOffsetsByTimestamp</td>
<td>json string
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
</td>
<td>none (the value of <code>startingOffsets</code> will apply)</td>
<td>none (next preference is <code>startingOffsets</code>)</td>
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or
equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist,
each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
<p/>
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
Note: This option requires Kafka 0.10.1.0 or higher.<p/>
Note2: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
Note3: For streaming queries, this only applies when a new query is started, and that resuming will
Note1: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
Note2: For streaming queries, this only applies when a new query is started, and that resuming will
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
@ -398,23 +407,28 @@ The following configurations are optional:
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>endingTimestamp</td>
<td>timestamp string e.g. "1000"</td>
<td>none (next preference is <code>endingOffsetsByTimestamp</code>)</td>
<td>batch query</td>
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for
all partitions in topics being subscribed. Please refer the details on timestamp offset options below.
If Kafka doesn't return the matched offset, the offset will be set to latest.<p/>
Note: <code>endingTimestamp</code> takes precedence over <code>endingOffsetsByTimestamp</code> and <code>endingOffsets</code>.<p/>
</td>
</tr>
<tr>
<td>endingOffsetsByTimestamp</td>
<td>json string
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
</td>
<td>latest</td>
<td>none (next preference is <code>endingOffsets</code>)</td>
<td>batch query</td>
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to
the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will
be set to latest.<p/>
<p/>
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
Note: This option requires Kafka 0.10.1.0 or higher.<p/>
Note2: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the offset will be set to latest.<p/>
Note: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
</td>
</tr>
<tr>
@ -512,6 +526,17 @@ The following configurations are optional:
</tr>
</table>
### Details on timestamp offset options
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option.
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value.
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)">javadoc</a> for details.
Also, the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.
Timestamp offset options require Kafka 0.10.1.0 or higher.
### Offset fetching
In Spark 3.0 and before Spark uses <code>KafkaConsumer</code> for offset fetching which could cause infinite wait in the driver.

View file

@ -72,6 +72,8 @@ class KafkaContinuousStream(
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) => offsetReader.fetchSpecificTimestampBasedOffsets(p,
failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) => offsetReader.fetchGlobalTimestampBasedOffsets(
ts, failsOnNoMatchingOffset = true)
}
logInfo(s"Initial offsets: $offsets")
offsets

View file

@ -166,6 +166,8 @@ private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) =>
kafkaOffsetReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")

View file

@ -49,6 +49,13 @@ private[kafka010] case class SpecificOffsetRangeLimit(
private[kafka010] case class SpecificTimestampRangeLimit(
topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
/**
* Represents the desire to bind to earliest offset which timestamp for the offset is equal or
* greater than specific timestamp. This applies the timestamp to the all topics/partitions.
*/
private[kafka010] case class GlobalTimestampRangeLimit(
timestamp: Long) extends KafkaOffsetRangeLimit
private[kafka010] object KafkaOffsetRangeLimit {
/**
* Used to denote offset range limits that are resolved via Kafka

View file

@ -77,6 +77,20 @@ private[kafka010] trait KafkaOffsetReader {
partitionTimestamps: Map[TopicPartition, Long],
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset
/**
* Resolves the specific offsets based on timestamp per all topic-partitions being subscribed.
* The returned offset for each partition is the earliest offset whose timestamp is greater
* than or equal to the given timestamp in the corresponding partition. If the matched offset
* doesn't exist, depending on `failsOnNoMatchingOffset` parameter, the offset will be set to
* latest or this method throws an error.
*
* @param timestamp the timestamp.
* @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found.
*/
def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset
/**
* Fetch the earliest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].

View file

@ -138,6 +138,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
case SpecificTimestampRangeLimit(partitionTimestamps) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp) =>
fetchGlobalTimestampBasedOffsets(timestamp,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
}
}
@ -193,6 +196,37 @@ private[kafka010] class KafkaOffsetReaderAdmin(
fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
}
override def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp")
}
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { tps =>
val listOffsetsParams = tps.asScala.map { tp =>
tp -> OffsetSpec.forTimestamp(timestamp)
}.toMap.asJava
admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.map {
case (tp, offsetSpec) =>
if (failsOnNoMatchingOffset) {
assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
s"matched from request of topic-partition $tp and timestamp " +
s"$timestamp.")
}
if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetSpec.offset()
}
}.toMap
}
fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
}
private def fetchSpecificOffsets0(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long]

View file

@ -160,6 +160,9 @@ private[kafka010] class KafkaOffsetReaderConsumer(
case SpecificTimestampRangeLimit(partitionTimestamps) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp) =>
fetchGlobalTimestampBasedOffsets(timestamp,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
}
}
@ -234,6 +237,39 @@ private[kafka010] class KafkaOffsetReaderConsumer(
fnAssertFetchedOffsets)
}
override def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp")
}
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { tps =>
val converted = tps.asScala.map(_ -> java.lang.Long.valueOf(timestamp)).toMap.asJava
val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
consumer.offsetsForTimes(converted)
offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
if (failsOnNoMatchingOffset) {
assert(offsetAndTimestamp != null, "No offset matched from request of " +
s"topic-partition $tp and timestamp $timestamp.")
}
if (offsetAndTimestamp == null) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetAndTimestamp.offset()
}
}.toMap
}
val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => }
fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets,
fnAssertFetchedOffsets)
}
private def fetchSpecificOffsets0(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long],

View file

@ -104,6 +104,8 @@ private[kafka010] class KafkaSource(
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) =>
kafkaReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")

View file

@ -90,8 +90,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters)
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)
val kafkaOffsetReader = KafkaOffsetReader.build(
strategy(caseInsensitiveParameters),
@ -128,13 +129,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters)
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit)
caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY,
EarliestOffsetRangeLimit)
assert(startingRelationOffsets != LatestOffsetRangeLimit)
val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveParameters, ENDING_TIMESTAMP_OPTION_KEY,
ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)
assert(endingRelationOffsets != EarliestOffsetRangeLimit)
val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean
@ -334,8 +337,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
private def validateBatchOptions(params: CaseInsensitiveMap[String]) = {
// Batch specific options
KafkaSourceProvider.getKafkaOffsetRangeLimit(
params, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY,
EarliestOffsetRangeLimit) match {
params, STARTING_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit => // good to go
case LatestOffsetRangeLimit =>
throw new IllegalArgumentException("starting offset can't be latest " +
@ -348,11 +351,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
case _ => // ignore
}
case _: SpecificTimestampRangeLimit => // good to go
case _: GlobalTimestampRangeLimit => // good to go
}
KafkaSourceProvider.getKafkaOffsetRangeLimit(
params, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit) match {
params, ENDING_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit =>
throw new IllegalArgumentException("ending offset can't be earliest " +
"for batch queries on Kafka")
@ -365,6 +369,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
case _ => // ignore
}
case _: SpecificTimestampRangeLimit => // good to go
case _: GlobalTimestampRangeLimit => // good to go
}
validateGeneralOptions(params)
@ -421,12 +426,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY,
EarliestOffsetRangeLimit)
val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveOptions, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, ENDING_TIMESTAMP_OPTION_KEY,
ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)
new KafkaBatch(
strategy(caseInsensitiveOptions),
@ -449,8 +456,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)
val kafkaOffsetReader = KafkaOffsetReader.build(
strategy(caseInsensitiveOptions),
@ -478,8 +486,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_TIMESTAMP_OPTION_KEY,
STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)
val kafkaOffsetReader = KafkaOffsetReader.build(
strategy(caseInsensitiveOptions),
@ -521,6 +530,8 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp"
private[kafka010] val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp"
private[kafka010] val STARTING_TIMESTAMP_OPTION_KEY = "startingtimestamp"
private[kafka010] val ENDING_TIMESTAMP_OPTION_KEY = "endingtimestamp"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
@ -564,20 +575,36 @@ private[kafka010] object KafkaSourceProvider extends Logging {
def getKafkaOffsetRangeLimit(
params: CaseInsensitiveMap[String],
globalOffsetTimestampOptionKey: String,
offsetByTimestampOptionKey: String,
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = {
params.get(offsetByTimestampOptionKey).map(_.trim) match {
case Some(json) => SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json))
case None =>
params.get(offsetOptionKey).map(_.trim) match {
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" =>
LatestOffsetRangeLimit
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" =>
EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => defaultOffsets
}
// The order below represents "preferences"
if (params.contains(globalOffsetTimestampOptionKey)) {
// 1. global timestamp
val tsStr = params(globalOffsetTimestampOptionKey).trim
try {
val ts = tsStr.toLong
GlobalTimestampRangeLimit(ts)
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Expected a single long value, got $tsStr")
}
} else if (params.contains(offsetByTimestampOptionKey)) {
// 2. timestamp per topic partition
val json = params(offsetByTimestampOptionKey).trim
SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json))
} else {
// 3. latest/earliest/offset
params.get(offsetOptionKey).map(_.trim) match {
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" =>
LatestOffsetRangeLimit
case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" =>
EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => defaultOffsets
}
}
}

View file

@ -1470,6 +1470,16 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
"failOnDataLoss" -> failOnDataLoss.toString)
}
test(s"assign from global timestamp per topic (failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic()
testFromGlobalTimestamp(
topic,
failOnDataLoss = failOnDataLoss,
addPartitions = false,
"assign" -> assignString(topic, 0 to 4),
"failOnDataLoss" -> failOnDataLoss.toString)
}
test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic()
testFromLatestOffsets(
@ -1499,6 +1509,13 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
"subscribe" -> topic)
}
test(s"subscribing topic by name from global timestamp per topic" +
s" (failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic()
testFromGlobalTimestamp(topic, failOnDataLoss = failOnDataLoss, addPartitions = true,
"subscribe" -> topic)
}
test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
@ -1538,6 +1555,17 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
addPartitions = true,
"subscribePattern" -> s"$topicPrefix-.*")
}
test(s"subscribing topic by pattern from global timestamp per topic " +
s"(failOnDataLoss: $failOnDataLoss)") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
testFromGlobalTimestamp(
topic,
failOnDataLoss = failOnDataLoss,
addPartitions = true,
"subscribePattern" -> s"$topicPrefix-.*")
}
}
test("bad source options") {
@ -1608,7 +1636,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
(STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
val offset = getKafkaOffsetRangeLimit(
CaseInsensitiveMap[String](Map(optionKey -> optionValue)), "dummy", optionKey,
CaseInsensitiveMap[String](Map(optionKey -> optionValue)), "dummy", "dummy", optionKey,
answer)
assert(offset === answer)
}
@ -1617,7 +1645,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
(STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
(ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
val offset = getKafkaOffsetRangeLimit(
CaseInsensitiveMap[String](Map.empty), "dummy", optionKey, answer)
CaseInsensitiveMap[String](Map.empty), "dummy", "dummy", optionKey, answer)
assert(offset === answer)
}
}
@ -1687,27 +1715,11 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
failOnDataLoss: Boolean,
addPartitions: Boolean,
options: (String, String)*): Unit = {
def sendMessages(topic: String, msgs: Seq[String], part: Int, ts: Long): Unit = {
val records = msgs.map { msg =>
new RecordBuilder(topic, msg).partition(part).timestamp(ts).build()
}
testUtils.sendMessages(records)
}
testUtils.createTopic(topic, partitions = 5)
val firstTimestamp = System.currentTimeMillis() - 5000
sendMessages(topic, Array(-20).map(_.toString), 0, firstTimestamp)
sendMessages(topic, Array(-10).map(_.toString), 1, firstTimestamp)
sendMessages(topic, Array(0, 1).map(_.toString), 2, firstTimestamp)
sendMessages(topic, Array(10, 11).map(_.toString), 3, firstTimestamp)
sendMessages(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp)
val secondTimestamp = firstTimestamp + 1000
sendMessages(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp)
sendMessages(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp)
sendMessages(topic, Array(2).map(_.toString), 2, secondTimestamp)
sendMessages(topic, Array(12).map(_.toString), 3, secondTimestamp)
setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
// no data after second timestamp for partition 4
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
@ -1719,18 +1731,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
) ++ Map(new TopicPartition(topic, 4) -> firstTimestamp)
val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps)
val reader = spark
.readStream
.format("kafka")
.option("startingOffsetsByTimestamp", startingTimestamps)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("failOnDataLoss", failOnDataLoss.toString)
options.foreach { case (k, v) => reader.option(k, v) }
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss,
options: _*)
testStream(mapped)(
makeSureGetOffsetCalled,
@ -1758,6 +1760,123 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
)
}
private def testFromGlobalTimestamp(
topic: String,
failOnDataLoss: Boolean,
addPartitions: Boolean,
options: (String, String)*): Unit = {
testUtils.createTopic(topic, partitions = 5)
val firstTimestamp = System.currentTimeMillis() - 5000
val secondTimestamp = firstTimestamp + 1000
setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
// here we should add records in partition 4 which match with second timestamp
// as the query will break if there's no matching records
sendMessagesWithTimestamp(topic, Array(23, 24).map(_.toString), 4, secondTimestamp)
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
// we intentionally starts from second timestamp for all partitions
// via setting global partition
val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, failOnDataLoss,
options: _*)
testStream(mapped)(
makeSureGetOffsetCalled,
Execute { q =>
// wait to reach the last offset in every partition
val partAndOffsets = (0 to 4).map(new TopicPartition(topic, _)).map { tp =>
if (tp.partition() < 4) {
tp -> 3L
} else {
tp -> 5L // we added 2 more records to partition 4
}
}.toMap
q.awaitOffset(0, KafkaSourceOffset(partAndOffsets), streamingTimeout.toMillis)
},
CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24),
StopStream,
StartStream(),
CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24), // Should get the data back on recovery
StopStream,
AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped
StartStream(),
CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32), // Should get the added data
AssertOnQuery("Add partitions") { query: StreamExecution =>
if (addPartitions) setTopicPartitions(topic, 10, query)
true
},
AddKafkaData(Set(topic), 40, 41, 42, 43, 44)(ensureDataInMultiplePartition = true),
CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32, 40, 41, 42, 43, 44),
StopStream
)
}
private def sendMessagesWithTimestamp(
topic: String,
msgs: Seq[String],
part: Int,
ts: Long): Unit = {
val records = msgs.map { msg =>
new RecordBuilder(topic, msg).partition(part).timestamp(ts).build()
}
testUtils.sendMessages(records)
}
private def setupTestMessagesForTestOnTimestampOffsets(
topic: String,
firstTimestamp: Long,
secondTimestamp: Long): Unit = {
sendMessagesWithTimestamp(topic, Array(-20).map(_.toString), 0, firstTimestamp)
sendMessagesWithTimestamp(topic, Array(-10).map(_.toString), 1, firstTimestamp)
sendMessagesWithTimestamp(topic, Array(0, 1).map(_.toString), 2, firstTimestamp)
sendMessagesWithTimestamp(topic, Array(10, 11).map(_.toString), 3, firstTimestamp)
sendMessagesWithTimestamp(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp)
sendMessagesWithTimestamp(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp)
sendMessagesWithTimestamp(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp)
sendMessagesWithTimestamp(topic, Array(2).map(_.toString), 2, secondTimestamp)
sendMessagesWithTimestamp(topic, Array(12).map(_.toString), 3, secondTimestamp)
// no data after second timestamp for partition 4
}
private def setupDataFrameForTestOnTimestampOffsets(
startingTimestamps: String,
failOnDataLoss: Boolean,
options: (String, String)*): Dataset[_] = {
val reader = spark
.readStream
.format("kafka")
.option("startingOffsetsByTimestamp", startingTimestamps)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("failOnDataLoss", failOnDataLoss.toString)
options.foreach { case (k, v) => reader.option(k, v) }
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
mapped
}
private def setupDataFrameForTestOnGlobalTimestamp(
startingTimestamp: Long,
failOnDataLoss: Boolean,
options: (String, String)*): Dataset[_] = {
val reader = spark
.readStream
.format("kafka")
.option("startingTimestamp", startingTimestamp)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("failOnDataLoss", failOnDataLoss.toString)
options.foreach { case (k, v) => reader.option(k, v) }
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
mapped
}
test("Kafka column types") {
val now = System.currentTimeMillis()
val topic = newTopic()

View file

@ -263,6 +263,15 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
}, topic, 0 to 19)
}
test("global timestamp provided for starting and ending") {
val (topic, timestamps) = prepareTimestampRelatedUnitTest
// timestamp both presented: starting "first" ending "finalized"
verifyTimestampRelatedQueryResult({ df =>
df.option("startingTimestamp", timestamps(1)).option("endingTimestamp", timestamps(2))
}, topic, 10 to 19)
}
test("no matched offset for timestamp - startingOffsets") {
val (topic, timestamps) = prepareTimestampRelatedUnitTest
@ -284,6 +293,44 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
TestUtils.assertExceptionMsg(e, "No offset matched from request")
}
test("preferences on offset related options") {
val (topic, timestamps) = prepareTimestampRelatedUnitTest
/*
The test will set both configs differently:
* global timestamp
starting only presented as "third", and ending not presented
* specific timestamp for partition
starting only presented as "second", and ending not presented
* offsets
starting only presented as "earliest", and ending not presented
The preference goes to global timestamp -> timestamp for partition -> offsets
*/
val startTopicTimestamps = Map(
(0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*)
val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps)
// all options are specified: global timestamp
verifyTimestampRelatedQueryResult({ df =>
df
.option("startingTimestamp", timestamps(2))
.option("startingOffsetsByTimestamp", startingTimestamps)
.option("startingOffsets", "earliest")
}, topic, 20 to 29)
// timestamp for partition and offsets are specified: timestamp for partition
verifyTimestampRelatedQueryResult({ df =>
df
.option("startingOffsetsByTimestamp", startingTimestamps)
.option("startingOffsets", "earliest")
}, topic, 10 to 29)
}
test("no matched offset for timestamp - endingOffsets") {
val (topic, timestamps) = prepareTimestampRelatedUnitTest