[SPARK-23419][SPARK-23416][SS] data source v2 write path should re-throw interruption exceptions directly

## What changes were proposed in this pull request?

Streaming execution has a list of exceptions that means interruption, and handle them specially. `WriteToDataSourceV2Exec` should also respect this list and not wrap them with `SparkException`.

## How was this patch tested?

existing test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20605 from cloud-fan/write.
This commit is contained in:
Wenchen Fan 2018-02-15 16:59:44 +08:00
parent 95e4b49160
commit f38c760638
2 changed files with 31 additions and 20 deletions

View file

@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.v2
import scala.util.control.NonFatal
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
@ -27,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@ -107,7 +110,13 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
throw new SparkException("Writing job failed.", cause)
}
logError(s"Data source writer $writer aborted.")
throw new SparkException("Writing job aborted.", cause)
cause match {
// Do not wrap interruption exceptions that will be handled by streaming specially.
case _ if StreamExecution.isInterruptionException(cause) => throw cause
// Only wrap non fatal exceptions.
case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
case _ => throw cause
}
}
sparkContext.emptyRDD

View file

@ -356,25 +356,7 @@ abstract class StreamExecution(
private def isInterruptedByStop(e: Throwable): Boolean = {
if (state.get == TERMINATED) {
e match {
// InterruptedIOException - thrown when an I/O operation is interrupted
// ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
true
// The cause of the following exceptions may be one of the above exceptions:
//
// UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
// BiFunction.apply
// ExecutionException - thrown by codes running in a thread pool and these codes throw an
// exception
// UncheckedExecutionException - thrown by codes that cannot throw a checked
// ExecutionException, such as BiFunction.apply
case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
if e2.getCause != null =>
isInterruptedByStop(e2.getCause)
case _ =>
false
}
StreamExecution.isInterruptionException(e)
} else {
false
}
@ -565,6 +547,26 @@ abstract class StreamExecution(
object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"
def isInterruptionException(e: Throwable): Boolean = e match {
// InterruptedIOException - thrown when an I/O operation is interrupted
// ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
true
// The cause of the following exceptions may be one of the above exceptions:
//
// UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
// BiFunction.apply
// ExecutionException - thrown by codes running in a thread pool and these codes throw an
// exception
// UncheckedExecutionException - thrown by codes that cannot throw a checked
// ExecutionException, such as BiFunction.apply
case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
if e2.getCause != null =>
isInterruptionException(e2.getCause)
case _ =>
false
}
}
/**