[SPARK-22094][SS] processAllAvailable should check the query state
## What changes were proposed in this pull request? `processAllAvailable` should also check the query state and if the query is stopped, it should return. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <zsxwing@gmail.com> Closes #19314 from zsxwing/SPARK-22094.
This commit is contained in:
parent
f32a842505
commit
fedf6961be
|
@ -840,7 +840,7 @@ class StreamExecution(
|
||||||
if (streamDeathCause != null) {
|
if (streamDeathCause != null) {
|
||||||
throw streamDeathCause
|
throw streamDeathCause
|
||||||
}
|
}
|
||||||
if (noNewData) {
|
if (noNewData || !isActive) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -640,6 +640,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("processAllAvailable should not block forever when a query is stopped") {
|
||||||
|
val input = MemoryStream[Int]
|
||||||
|
input.addData(1)
|
||||||
|
val query = input.toDF().writeStream
|
||||||
|
.trigger(Trigger.Once())
|
||||||
|
.format("console")
|
||||||
|
.start()
|
||||||
|
failAfter(streamingTimeout) {
|
||||||
|
query.processAllAvailable()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
|
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
|
||||||
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
|
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
|
||||||
require(!triggerDF.isStreaming)
|
require(!triggerDF.isStreaming)
|
||||||
|
|
Loading…
Reference in a new issue