From 77e52448e7f94aadfa852cc67084415de6ecfa7c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 20 Sep 2018 15:46:33 -0700 Subject: [PATCH] [SPARK-25472][SS] Don't have legitimate stops of streams cause stream exceptions ## What changes were proposed in this pull request? Legitimate stops of streams may actually cause an exception to be captured by stream execution, because the job throws a SparkException regarding job cancellation during a stop. This PR makes the stop more graceful by swallowing this cancellation error. ## How was this patch tested? This is pretty hard to test. The existing tests should make sure that we're not swallowing other specific SparkExceptions. I've also run the `KafkaSourceStressForDontFailOnDataLossSuite`100 times, and it didn't fail, whereas it used to be flaky. Closes #22478 from brkyvz/SPARK-25472. Authored-by: Burak Yavuz Signed-off-by: Burak Yavuz --- .../execution/streaming/StreamExecution.scala | 22 ++++++++++++++----- .../continuous/ContinuousExecution.scala | 4 ++-- .../WriteToContinuousDataSourceExec.scala | 2 +- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f6c60c1c92..631a6eb649 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -282,7 +283,7 @@ abstract class StreamExecution( // `stop()` is already called. Let `finally` finish the cleanup. } } catch { - case e if isInterruptedByStop(e) => + case e if isInterruptedByStop(e, sparkSession.sparkContext) => // interrupted by stop() updateStatusMessage("Stopped") case e: IOException if e.getMessage != null @@ -354,9 +355,9 @@ abstract class StreamExecution( } } - private def isInterruptedByStop(e: Throwable): Boolean = { + private def isInterruptedByStop(e: Throwable, sc: SparkContext): Boolean = { if (state.get == TERMINATED) { - StreamExecution.isInterruptionException(e) + StreamExecution.isInterruptionException(e, sc) } else { false } @@ -531,7 +532,7 @@ object StreamExecution { val QUERY_ID_KEY = "sql.streaming.queryId" val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" - def isInterruptionException(e: Throwable): Boolean = e match { + def isInterruptionException(e: Throwable, sc: SparkContext): Boolean = e match { // InterruptedIOException - thrown when an I/O operation is interrupted // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException => @@ -546,7 +547,18 @@ object StreamExecution { // ExecutionException, such as BiFunction.apply case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException) if e2.getCause != null => - isInterruptionException(e2.getCause) + isInterruptionException(e2.getCause, sc) + case se: SparkException => + val jobGroup = sc.getLocalProperty("spark.jobGroup.id") + if (jobGroup == null) return false + val errorMsg = se.getMessage + if (errorMsg.contains("cancelled") && errorMsg.contains(jobGroup) && se.getCause == null) { + true + } else if (se.getCause != null) { + isInterruptionException(se.getCause, sc) + } else { + false + } case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index ccca72667a..f009c52449 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -265,8 +265,8 @@ class ContinuousExecution( sparkSessionForQuery, lastExecution)(lastExecution.toRdd) } } catch { - case t: Throwable - if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => + case t: Throwable if StreamExecution.isInterruptionException(t, sparkSession.sparkContext) && + state.get() == RECONFIGURING => logInfo(s"Query $id ignoring exception from reconfiguring: $t") // interrupted by reconfiguration - swallow exception so we can restart the query } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index c216b61383..a797ac1879 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -57,7 +57,7 @@ case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, case cause: Throwable => cause match { // Do not wrap interruption exceptions that will be handled by streaming specially. - case _ if StreamExecution.isInterruptionException(cause) => throw cause + case _ if StreamExecution.isInterruptionException(cause, sparkContext) => throw cause // Only wrap non fatal exceptions. case NonFatal(e) => throw new SparkException("Writing job aborted.", e) case _ => throw cause