[SPARK-35312][SS][FOLLOW-UP] More documents and checking logic for the new options

### What changes were proposed in this pull request?
Add more documents and checking logic for the new options `minOffsetPerTrigger` and `maxTriggerDelay`.

### Why are the changes needed?
Have a clear description of the behavior introduced in SPARK-35312

### Does this PR introduce _any_ user-facing change?
Yes.
If the user set minOffsetsPerTrigger > maxOffsetsPerTrigger, the new code will throw an AnalysisException. The original behavior is to ignore the maxOffsetsPerTrigger silenctly.

### How was this patch tested?
Existing tests.

Closes #33792 from xuanyuanking/SPARK-35312-follow.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Yuanjian Li 2021-08-20 10:41:42 +09:00 committed by Jungtaek Lim
parent 462aa7cd3c
commit a0b24019ed
3 changed files with 26 additions and 6 deletions

View file

@ -477,23 +477,26 @@ The following configurations are optional:
<td>maxOffsetsPerTrigger</td>
<td>long</td>
<td>none</td>
<td>streaming and batch</td>
<td>streaming query</td>
<td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
</tr>
<tr>
<td>minOffsetsPerTrigger</td>
<td>long</td>
<td>none</td>
<td>streaming and batch</td>
<td>Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will
be proportionally split across topicPartitions of different volume.</td>
<td>streaming query</td>
<td>Minimum number of offsets to be processed per trigger interval. The specified total number of
offsets will be proportionally split across topicPartitions of different volume. Note, if the
maxTriggerDelay is exceeded, a trigger will be fired even if the number of available offsets
doesn't reach minOffsetsPerTrigger.</td>
</tr>
<tr>
<td>maxTriggerDelay</td>
<td>time with units</td>
<td>15m</td>
<td>streaming and batch</td>
<td>Maximum amount of time for which trigger can be delayed between two triggers provided some data is available from the source.</td>
<td>streaming query</td>
<td>Maximum amount of time for which trigger can be delayed between two triggers provided some
data is available from the source. This option is only applicable if minOffsetsPerTrigger is set.</td>
</tr>
<tr>
<td>minPartitions</td>

View file

@ -322,6 +322,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
s"Option 'kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}' must be specified for " +
s"configuring Kafka consumer")
}
if (params.contains(MIN_OFFSET_PER_TRIGGER) && params.contains(MAX_OFFSET_PER_TRIGGER)) {
val minOffsets = params.get(MIN_OFFSET_PER_TRIGGER).get.toLong
val maxOffsets = params.get(MAX_OFFSET_PER_TRIGGER).get.toLong
if (minOffsets > maxOffsets) {
throw new IllegalArgumentException(s"The value of minOffsetPerTrigger($minOffsets) is " +
s"higher than the maxOffsetsPerTrigger($maxOffsets).")
}
}
}
private def validateStreamOptions(params: CaseInsensitiveMap[String]) = {
@ -382,6 +391,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
if (params.contains(MIN_OFFSET_PER_TRIGGER)) {
logWarning("minOffsetsPerTrigger option ignored in batch queries")
}
if (params.contains(MAX_TRIGGER_DELAY)) {
logWarning("maxTriggerDelay option ignored in batch queries")
}
}
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite {

View file

@ -1908,6 +1908,10 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
testBadOptions("assign" -> "")("no topicpartitions to assign")
testBadOptions("subscribe" -> "")("no topics to subscribe")
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
testBadOptions(
"kafka.bootstrap.servers" -> "fake", "subscribe" -> "t", "minOffsetsPerTrigger" -> "20",
"maxOffsetsPerTrigger" -> "15")(
"value of minOffsetPerTrigger(20) is higher than the maxOffsetsPerTrigger(15)")
}
test("unsupported kafka configs") {