[SPARK-15508][STREAMING][TESTS] Fix flaky test: JavaKafkaStreamSuite.testKafkaStream
## What changes were proposed in this pull request? `JavaKafkaStreamSuite.testKafkaStream` assumes when `sent.size == result.size`, the contents of `sent` and `result` should be same. However, that's not true. The content of `result` may not be the final content. This PR modified the test to always retry the assertions even if the contents of `sent` and `result` are not same. Here is the failure in Jenkins: http://spark-tests.appspot.com/tests/org.apache.spark.streaming.kafka.JavaKafkaStreamSuite/testKafkaStream ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13281 from zsxwing/flaky-kafka-test.
This commit is contained in:
parent
50b660d725
commit
c9c1c0e54d
|
@ -122,14 +122,23 @@ public class JavaKafkaStreamSuite implements Serializable {
|
|||
ssc.start();
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
boolean sizeMatches = false;
|
||||
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
|
||||
sizeMatches = sent.size() == result.size();
|
||||
AssertionError lastError = null;
|
||||
while (System.currentTimeMillis() - startTime < 20000) {
|
||||
try {
|
||||
Assert.assertEquals(sent.size(), result.size());
|
||||
for (Map.Entry<String, Integer> e : sent.entrySet()) {
|
||||
Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
|
||||
}
|
||||
return;
|
||||
} catch (AssertionError e) {
|
||||
lastError = e;
|
||||
}
|
||||
Thread.sleep(200);
|
||||
}
|
||||
Assert.assertEquals(sent.size(), result.size());
|
||||
for (Map.Entry<String, Integer> e : sent.entrySet()) {
|
||||
Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
|
||||
if (lastError != null) {
|
||||
throw lastError;
|
||||
} else {
|
||||
Assert.fail("timeout");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue