diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 64020882b3..cb45384613 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -28,6 +28,7 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random +import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) + val customGroupId = "id-" + Random.nextInt() val dsKafka = spark .readStream .format("kafka") - .option("kafka.group.id", "id-" + Random.nextInt()) + .option("kafka.group.id", customGroupId) .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("subscribe", topic) .option("startingOffsets", "earliest") @@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testStream(dsKafka)( makeSureGetOffsetCalled, - CheckAnswer(1 to 30: _*) + CheckAnswer(1 to 30: _*), + Execute { _ => + val consumerGroups = testUtils.listConsumerGroups() + val validGroups = consumerGroups.valid().get() + val validGroupsId = validGroups.asScala.map(_.groupId()) + assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " + + s"contain the expected group id - Valid consumer groups: $validGroupsId / " + + s"expected group id: $customGroupId") + } ) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index efe7385ed1..2cd13a994e 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ +import scala.util.Random + import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) - val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom")) + val customGroupId = "id-" + Random.nextInt() + val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId)) checkAnswer(df, (1 to 30).map(_.toString).toDF()) + + val consumerGroups = testUtils.listConsumerGroups() + val validGroups = consumerGroups.valid().get() + val validGroupsId = validGroups.asScala.map(_.groupId()) + assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " + + s"contain the expected group id - Valid consumer groups: $validGroupsId / " + + s"expected group id: $customGroupId") } test("read Kafka transactional messages: read_committed") { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index bf6934be52..dacfffa867 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions} +import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition @@ -311,6 +311,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L offsets } + def listConsumerGroups(): ListConsumerGroupsResult = { + adminClient.listConsumerGroups() + } + protected def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0")