[SPARK-31324][SS] Include stream ID in the termination timeout error message

### What changes were proposed in this pull request?

This PR (SPARK-31324) aims to include stream ID in the error thrown when a stream does not stop() in time.

### Why are the changes needed?

https://github.com/apache/spark/pull/26771/ added a conf to set a requested timeout for stopping a stream, after which the stop() method throws. From seeing this in a production use case with several streams running, it's helpful to include which stream failed to stop in the error message.

### Does this PR introduce any user-facing change?

If a stream times out when terminating, the error message now includes the stream ID.

Before:
`Stream Execution thread failed to stop within 2000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`

After:
`Stream Execution thread for stream [id = 8513769d-b9d2-4902-9b36-3668bd022245, runId = 21ed8c35-9bfe-423f-853d-c022d91818bc] failed to stop within 2000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`

### How was this patch tested?

Updated existing unit test

Closes #28095 from mukulmurthy/31324-id.

Authored-by: Mukul Murthy <mukul.murthy@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Mukul Murthy 2020-04-02 12:37:58 +09:00 committed by HyukjinKwon
parent 09f036a14c
commit 34abbb677d
2 changed files with 5 additions and 4 deletions

View file

@ -451,9 +451,9 @@ abstract class StreamExecution(
val stackTraceException = new SparkException("The stream thread was last executing:")
stackTraceException.setStackTrace(queryExecutionThread.getStackTrace)
val timeoutException = new TimeoutException(
s"Stream Execution thread failed to stop within $timeout milliseconds (specified by " +
s"${SQLConf.STREAMING_STOP_TIMEOUT.key}). See the cause on what was " +
"being executed in the streaming query thread.")
s"Stream Execution thread for stream $prettyIdString failed to stop within $timeout " +
s"milliseconds (specified by ${SQLConf.STREAMING_STOP_TIMEOUT.key}). See the cause on " +
s"what was being executed in the streaming query thread.")
timeoutException.initCause(stackTraceException)
throw timeoutException
}

View file

@ -1245,9 +1245,10 @@ class StreamSuite extends StreamTest {
failAfter(60.seconds) {
val startTime = System.nanoTime()
withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "2000") {
intercept[TimeoutException] {
val ex = intercept[TimeoutException] {
sq.stop()
}
assert(ex.getMessage.contains(sq.id.toString))
}
val duration = (System.nanoTime() - startTime) / 1e6
assert(duration >= 2000,