[SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions

## What changes were proposed in this pull request?

Legitimate stops of streams may actually cause an exception to be captured by stream execution, because the job throws a SparkException regarding job cancellation during a stop. This PR makes the stop more graceful by swallowing this cancellation error.

## How was this patch tested?

This is pretty hard to test. The existing tests should make sure that we're not swallowing other specific SparkExceptions. I've also run the `KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, whereas it used to be flaky.

Closes #22478 from brkyvz/SPARK-25472.

Authored-by: Burak Yavuz <brkyvz@gmail.com>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
This commit is contained in:
Burak Yavuz 2018-09-20 15:46:33 -07:00
parent 4d114fc9a2
commit 77e52448e7
3 changed files with 20 additions and 8 deletions

View file

@ -30,6 +30,7 @@ import scala.util.control.NonFatal
import com.google.common.util.concurrent.UncheckedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@ -282,7 +283,7 @@ abstract class StreamExecution(
// `stop()` is already called. Let `finally` finish the cleanup.
}
} catch {
case e if isInterruptedByStop(e) =>
case e if isInterruptedByStop(e, sparkSession.sparkContext) =>
// interrupted by stop()
updateStatusMessage("Stopped")
case e: IOException if e.getMessage != null
@ -354,9 +355,9 @@ abstract class StreamExecution(
}
}
private def isInterruptedByStop(e: Throwable): Boolean = {
private def isInterruptedByStop(e: Throwable, sc: SparkContext): Boolean = {
if (state.get == TERMINATED) {
StreamExecution.isInterruptionException(e)
StreamExecution.isInterruptionException(e, sc)
} else {
false
}
@ -531,7 +532,7 @@ object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"
val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
def isInterruptionException(e: Throwable): Boolean = e match {
def isInterruptionException(e: Throwable, sc: SparkContext): Boolean = e match {
// InterruptedIOException - thrown when an I/O operation is interrupted
// ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
@ -546,7 +547,18 @@ object StreamExecution {
// ExecutionException, such as BiFunction.apply
case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
if e2.getCause != null =>
isInterruptionException(e2.getCause)
isInterruptionException(e2.getCause, sc)
case se: SparkException =>
val jobGroup = sc.getLocalProperty("spark.jobGroup.id")
if (jobGroup == null) return false
val errorMsg = se.getMessage
if (errorMsg.contains("cancelled") && errorMsg.contains(jobGroup) && se.getCause == null) {
true
} else if (se.getCause != null) {
isInterruptionException(se.getCause, sc)
} else {
false
}
case _ =>
false
}

View file

@ -265,8 +265,8 @@ class ContinuousExecution(
sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
}
} catch {
case t: Throwable
if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
case t: Throwable if StreamExecution.isInterruptionException(t, sparkSession.sparkContext) &&
state.get() == RECONFIGURING =>
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {

View file

@ -57,7 +57,7 @@ case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport,
case cause: Throwable =>
cause match {
// Do not wrap interruption exceptions that will be handled by streaming specially.
case _ if StreamExecution.isInterruptionException(cause) => throw cause
case _ if StreamExecution.isInterruptionException(cause, sparkContext) => throw cause
// Only wrap non fatal exceptions.
case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
case _ => throw cause