diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 714da92bcd..d9fad5e84f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -2429,8 +2429,9 @@ class KafkaSourceStressSuite extends KafkaSourceTest { (d, running) => { Random.nextInt(5) match { case 0 => // Add a new topic - topics = topics ++ Seq(newStressTopic) - AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic", + val newTopic = newStressTopic + topics = topics ++ Seq(newTopic) + AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newTopic", topicAction = (topic, partition) => { if (partition.isEmpty) { testUtils.createTopic(topic, partitions = nextInt(1, 6)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 624b630401..ff182b524b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -283,7 +283,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with /** Assert that a condition on the active query is true */ class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String) - extends StreamAction { + extends StreamAction with StreamMustBeRunning { override def toString: String = s"AssertOnQuery(, $message)" } @@ -871,6 +871,10 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with case r if r < 0.7 => // AddData addRandomData() + // In some suites, e.g. `KafkaSourceStressSuite`, we delete Kafka topic in the + // `addData` closure. In the case, the topic with added data might be deleted + // before next check. So we must check data after adding data here. + addCheck() case _ => // StopStream addCheck()