[SPARK-33962][SS] Fix incorrect min partition condition

### What changes were proposed in this pull request?

This patch fixes an incorrect condition when comparing offset range size and min partition config.

### Why are the changes needed?

When calculating offset ranges, we consider `minPartitions` configuration. If `minPartitions` is not set or is less than or equal the size of given ranges, it means there are enough partitions at Kafka so we don't need to split offsets to satisfy min partition requirement. But the current condition is `offsetRanges.size > minPartitions.get` and is not correct. Currently `getRanges` will split offsets in unnecessary case.

Besides, in non-split case, we can assign preferred executor location and reuse `KafkaConsumer`. So unnecessary splitting offset range will miss the chance to reuse `KafkaConsumer`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Manual test in Spark cluster with Kafka.

Closes #30994 from viirya/ss-minor4.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Liang-Chi Hsieh 2021-01-03 01:29:12 -08:00 committed by Dongjoon Hyun
parent fc7d0165d2
commit cfd4a08398
2 changed files with 15 additions and 1 deletions

View file

@ -46,7 +46,7 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
val offsetRanges = ranges.filter(_.size > 0)
// If minPartitions not set or there are enough partitions to satisfy minPartitions
if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) {
// Assign preferred executor locations to each range such that the same topic-partition is
// preferentially read from the same executor and the KafkaConsumer can be reused.
offsetRanges.map { range =>

View file

@ -71,6 +71,20 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
KafkaOffsetRange(tp3, 1, 2, None)))
}
testWithMinPartitions("N TopicPartitions to N offset ranges with executors", 3) { calc =>
assert(
calc.getRanges(
Seq(
KafkaOffsetRange(tp1, 1, 2),
KafkaOffsetRange(tp2, 1, 2),
KafkaOffsetRange(tp3, 1, 2)),
Seq("exec1", "exec2", "exec3")) ===
Seq(
KafkaOffsetRange(tp1, 1, 2, Some("exec3")),
KafkaOffsetRange(tp2, 1, 2, Some("exec1")),
KafkaOffsetRange(tp3, 1, 2, Some("exec2"))))
}
testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc =>
assert(
calc.getRanges(