From af7dd18a5ede5ed87c3f1a13100633f7f60d2cd8 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 17 Sep 2021 21:28:02 +0800 Subject: [PATCH] [SPARK-36764][SS][TEST] Fix race-condition on "ensure continuous stream is being used" in KafkaContinuousTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? The test “ensure continuous stream is being used“ in KafkaContinuousTest quickly checks the actual type of the execution, and stops the query. Stopping the streaming query in continuous mode is done by interrupting query execution thread and join with indefinite timeout. In parallel, started streaming query is going to generate execution plan, including running optimizer. Some parts of SessionState can be built at that time, as they are defined as lazy. The problem is, some of them seem to “swallow” the InterruptedException and let the thread run continuously. That said, the query can’t indicate whether there is a request on stopping query, so the query won’t stop. This PR fixes such scenario via ensuring that streaming query has started before the test stops the query. ### Why are the changes needed? Race-condition could end up with test hang till test framework marks it as timed-out. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #34004 from HeartSaVioR/SPARK-36764. Authored-by: Jungtaek Lim Signed-off-by: Wenchen Fan (cherry picked from commit 6099edc66eb35db548230eeaba791c730eb38f84) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/kafka010/KafkaContinuousTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index 9ee8cbfa1b..4b6a5b899f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -91,6 +91,7 @@ trait KafkaContinuousTest extends KafkaSourceTest { .load() testStream(query)( + makeSureGetOffsetCalled, Execute(q => assert(q.isInstanceOf[ContinuousExecution])) ) }