diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 6926bbbb2b..0ec359f800 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -477,23 +477,26 @@ The following configurations are optional: maxOffsetsPerTrigger long none - streaming and batch + streaming query Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. minOffsetsPerTrigger long none - streaming and batch - Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will - be proportionally split across topicPartitions of different volume. + streaming query + Minimum number of offsets to be processed per trigger interval. The specified total number of + offsets will be proportionally split across topicPartitions of different volume. Note, if the + maxTriggerDelay is exceeded, a trigger will be fired even if the number of available offsets + doesn't reach minOffsetsPerTrigger. maxTriggerDelay time with units 15m - streaming and batch - Maximum amount of time for which trigger can be delayed between two triggers provided some data is available from the source. + streaming query + Maximum amount of time for which trigger can be delayed between two triggers provided some + data is available from the source. This option is only applicable if minOffsetsPerTrigger is set. minPartitions diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 38803b7825..4a75ab0949 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -322,6 +322,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister s"Option 'kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}' must be specified for " + s"configuring Kafka consumer") } + + if (params.contains(MIN_OFFSET_PER_TRIGGER) && params.contains(MAX_OFFSET_PER_TRIGGER)) { + val minOffsets = params.get(MIN_OFFSET_PER_TRIGGER).get.toLong + val maxOffsets = params.get(MAX_OFFSET_PER_TRIGGER).get.toLong + if (minOffsets > maxOffsets) { + throw new IllegalArgumentException(s"The value of minOffsetPerTrigger($minOffsets) is " + + s"higher than the maxOffsetsPerTrigger($maxOffsets).") + } + } } private def validateStreamOptions(params: CaseInsensitiveMap[String]) = { @@ -382,6 +391,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister if (params.contains(MIN_OFFSET_PER_TRIGGER)) { logWarning("minOffsetsPerTrigger option ignored in batch queries") } + + if (params.contains(MAX_TRIGGER_DELAY)) { + logWarning("maxTriggerDelay option ignored in batch queries") + } } class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index d9fad5e84f..f61696f648 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1908,6 +1908,10 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { testBadOptions("assign" -> "")("no topicpartitions to assign") testBadOptions("subscribe" -> "")("no topics to subscribe") testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") + testBadOptions( + "kafka.bootstrap.servers" -> "fake", "subscribe" -> "t", "minOffsetsPerTrigger" -> "20", + "maxOffsetsPerTrigger" -> "15")( + "value of minOffsetPerTrigger(20) is higher than the maxOffsetsPerTrigger(15)") } test("unsupported kafka configs") {