[SPARK-19718][SS] Handle more interrupt cases properly for Hadoop
## What changes were proposed in this pull request?
[SPARK-19617](https://issues.apache.org/jira/browse/SPARK-19617) changed `HDFSMetadataLog` to enable interrupts when using the local file system. However, now we hit [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074): `Shell.runCommand` converts `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. This is the Hadoop patch to fix HADOOP-1207: 95c73d49b1
This PR adds new logic to handle the following cases related to `InterruptedException`.
- Check if the message of IOException starts with `java.lang.InterruptedException`. If so, treat it as `InterruptedException`. This is for pre-Hadoop 2.8.
- Treat `InterruptedIOException` as `InterruptedException`. This is for Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the thread is interrupted.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #17044 from zsxwing/SPARK-19718.
This commit is contained in:
parent
f5fdbe0436
commit
a6a7a95e2f
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.execution.streaming
|
package org.apache.spark.sql.execution.streaming
|
||||||
|
|
||||||
|
import java.io.{InterruptedIOException, IOException}
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
|
||||||
import org.apache.spark.sql.streaming._
|
import org.apache.spark.sql.streaming._
|
||||||
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
|
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
|
||||||
|
|
||||||
|
/** States for [[StreamExecution]]'s lifecycle. */
|
||||||
|
trait State
|
||||||
|
case object INITIALIZING extends State
|
||||||
|
case object ACTIVE extends State
|
||||||
|
case object TERMINATED extends State
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
|
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
|
||||||
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
|
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
|
||||||
|
@ -298,7 +305,14 @@ class StreamExecution(
|
||||||
// `stop()` is already called. Let `finally` finish the cleanup.
|
// `stop()` is already called. Let `finally` finish the cleanup.
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
|
case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
|
||||||
|
// interrupted by stop()
|
||||||
|
updateStatusMessage("Stopped")
|
||||||
|
case e: IOException if e.getMessage != null
|
||||||
|
&& e.getMessage.startsWith(classOf[InterruptedException].getName)
|
||||||
|
&& state.get == TERMINATED =>
|
||||||
|
// This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
|
||||||
|
// to `new IOException(ie.toString())` before Hadoop 2.8.
|
||||||
updateStatusMessage("Stopped")
|
updateStatusMessage("Stopped")
|
||||||
case e: Throwable =>
|
case e: Throwable =>
|
||||||
streamDeathCause = new StreamingQueryException(
|
streamDeathCause = new StreamingQueryException(
|
||||||
|
@ -721,10 +735,6 @@ class StreamExecution(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait State
|
|
||||||
case object INITIALIZING extends State
|
|
||||||
case object ACTIVE extends State
|
|
||||||
case object TERMINATED extends State
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.streaming
|
package org.apache.spark.sql.streaming
|
||||||
|
|
||||||
|
import java.io.{InterruptedIOException, IOException}
|
||||||
|
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
|
||||||
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.util.control.ControlThrowable
|
import scala.util.control.ControlThrowable
|
||||||
|
|
||||||
|
@ -350,13 +353,45 @@ class StreamSuite extends StreamTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") {
|
||||||
|
// This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the
|
||||||
|
// streaming thread is interrupted. We should handle it properly by not failing the query.
|
||||||
|
ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new CountDownLatch(1)
|
||||||
|
val query = spark
|
||||||
|
.readStream
|
||||||
|
.format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
|
||||||
|
.load()
|
||||||
|
.writeStream
|
||||||
|
.format("console")
|
||||||
|
.start()
|
||||||
|
assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
|
||||||
|
.await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
|
||||||
|
"ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before timeout")
|
||||||
|
query.stop()
|
||||||
|
assert(query.exception.isEmpty)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("handle InterruptedIOException when the streaming thread is interrupted (Hadoop 2.8+)") {
|
||||||
|
// This test uses a fake source to throw the same InterruptedIOException as Hadoop 2.8+ when the
|
||||||
|
// streaming thread is interrupted. We should handle it properly by not failing the query.
|
||||||
|
ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
|
||||||
|
val query = spark
|
||||||
|
.readStream
|
||||||
|
.format(classOf[ThrowingInterruptedIOException].getName)
|
||||||
|
.load()
|
||||||
|
.writeStream
|
||||||
|
.format("console")
|
||||||
|
.start()
|
||||||
|
assert(ThrowingInterruptedIOException.createSourceLatch
|
||||||
|
.await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
|
||||||
|
"ThrowingInterruptedIOException.createSource wasn't called before timeout")
|
||||||
|
query.stop()
|
||||||
|
assert(query.exception.isEmpty)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
abstract class FakeSource extends StreamSourceProvider {
|
||||||
* A fake StreamSourceProvider thats creates a fake Source that cannot be reused.
|
|
||||||
*/
|
|
||||||
class FakeDefaultSource extends StreamSourceProvider {
|
|
||||||
|
|
||||||
private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
|
private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
|
||||||
|
|
||||||
override def sourceSchema(
|
override def sourceSchema(
|
||||||
|
@ -364,6 +399,10 @@ class FakeDefaultSource extends StreamSourceProvider {
|
||||||
schema: Option[StructType],
|
schema: Option[StructType],
|
||||||
providerName: String,
|
providerName: String,
|
||||||
parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
|
parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A fake StreamSourceProvider that creates a fake Source that cannot be reused. */
|
||||||
|
class FakeDefaultSource extends FakeSource {
|
||||||
|
|
||||||
override def createSource(
|
override def createSource(
|
||||||
spark: SQLContext,
|
spark: SQLContext,
|
||||||
|
@ -395,3 +434,63 @@ class FakeDefaultSource extends StreamSourceProvider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** A fake source that throws the same IOException like pre Hadoop 2.8 when it's interrupted. */
|
||||||
|
class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
|
||||||
|
import ThrowingIOExceptionLikeHadoop12074._
|
||||||
|
|
||||||
|
override def createSource(
|
||||||
|
spark: SQLContext,
|
||||||
|
metadataPath: String,
|
||||||
|
schema: Option[StructType],
|
||||||
|
providerName: String,
|
||||||
|
parameters: Map[String, String]): Source = {
|
||||||
|
createSourceLatch.countDown()
|
||||||
|
try {
|
||||||
|
Thread.sleep(30000)
|
||||||
|
throw new TimeoutException("sleep was not interrupted in 30 seconds")
|
||||||
|
} catch {
|
||||||
|
case ie: InterruptedException =>
|
||||||
|
throw new IOException(ie.toString)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object ThrowingIOExceptionLikeHadoop12074 {
|
||||||
|
/**
|
||||||
|
* A latch to allow the user to wait until [[ThrowingIOExceptionLikeHadoop12074.createSource]] is
|
||||||
|
* called.
|
||||||
|
*/
|
||||||
|
@volatile var createSourceLatch: CountDownLatch = null
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when it's interrupted. */
|
||||||
|
class ThrowingInterruptedIOException extends FakeSource {
|
||||||
|
import ThrowingInterruptedIOException._
|
||||||
|
|
||||||
|
override def createSource(
|
||||||
|
spark: SQLContext,
|
||||||
|
metadataPath: String,
|
||||||
|
schema: Option[StructType],
|
||||||
|
providerName: String,
|
||||||
|
parameters: Map[String, String]): Source = {
|
||||||
|
createSourceLatch.countDown()
|
||||||
|
try {
|
||||||
|
Thread.sleep(30000)
|
||||||
|
throw new TimeoutException("sleep was not interrupted in 30 seconds")
|
||||||
|
} catch {
|
||||||
|
case ie: InterruptedException =>
|
||||||
|
val iie = new InterruptedIOException(ie.toString)
|
||||||
|
iie.initCause(ie)
|
||||||
|
throw iie
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object ThrowingInterruptedIOException {
|
||||||
|
/**
|
||||||
|
* A latch to allow the user to wait until [[ThrowingInterruptedIOException.createSource]] is
|
||||||
|
* called.
|
||||||
|
*/
|
||||||
|
@volatile var createSourceLatch: CountDownLatch = null
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue