[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350

## What changes were proposed in this pull request?

This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter.

## How was this patch tested?

Modified UT.

Closes #23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-01-15 14:21:51 -08:00 committed by Shixiong Zhu
parent 954ef96c49
commit 2ebb79b2a6
No known key found for this signature in database
GPG key ID: ECB1BBEA55295E39
3 changed files with 29 additions and 4 deletions

View file

@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
val customGroupId = "id-" + Random.nextInt()
val dsKafka = spark
.readStream
.format("kafka")
.option("kafka.group.id", "id-" + Random.nextInt())
.option("kafka.group.id", customGroupId)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testStream(dsKafka)(
makeSureGetOffsetCalled,
CheckAnswer(1 to 30: _*)
CheckAnswer(1 to 30: _*),
Execute { _ =>
val consumerGroups = testUtils.listConsumerGroups()
val validGroups = consumerGroups.valid().get()
val validGroupsId = validGroups.asScala.map(_.groupId())
assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
s"expected group id: $customGroupId")
}
)
}

View file

@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
val customGroupId = "id-" + Random.nextInt()
val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId))
checkAnswer(df, (1 to 30).map(_.toString).toDF())
val consumerGroups = testUtils.listConsumerGroups()
val validGroups = consumerGroups.valid().get()
val validGroupsId = validGroups.asScala.map(_.groupId())
assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
s"expected group id: $customGroupId")
}
test("read Kafka transactional messages: read_committed") {

View file

@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
@ -311,6 +311,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
offsets
}
def listConsumerGroups(): ListConsumerGroupsResult = {
adminClient.listConsumerGroups()
}
protected def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")