[SPARK-18477][SS] Enable interrupts for HDFS in HDFSMetadataLog

## What changes were proposed in this pull request?

HDFS `write` may just hang until timeout if some network error happens. It's better to enable interrupts to allow stopping the query fast on HDFS.

This PR just changes the logic to only disable interrupts for local file system, as HADOOP-10622 only happens for local file system.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15911 from zsxwing/interrupt-on-dfs.
This commit is contained in:
Shixiong Zhu 2016-11-18 16:13:02 -08:00 committed by Tathagata Das
parent 40d59ff5ea
commit e5f5c29e02

View file

@ -105,25 +105,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*
* Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]]
* so that interrupts can be disabled while writing the batch file. This is because there is a
* potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread
* running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our
* case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the
* file permissions, and can get deadlocked if the stream execution thread is stopped by
* interrupt. Hence, we make sure that this method is called on [[UninterruptibleThread]] which
* allows us to disable interrupts here. Also see SPARK-14131.
*/
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
if (fileManager.isLocalFileSystem) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
// while writing the batch file. This is because there is a potential dead-lock in
// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
// the file permission if using the local file system, and can get deadlocked if the
// stream execution thread is stopped by interrupt. Hence, we make sure that
// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
// interrupts here. Also see SPARK-14131.
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
"a o.a.spark.util.UninterruptibleThread")
}
} else {
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
writeBatch(batchId, metadata, serialize)
}
true
}
@ -298,6 +307,9 @@ object HDFSMetadataLog {
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
def delete(path: Path): Unit
/** Whether the file systme is a local FS. */
def isLocalFileSystem: Boolean
}
/**
@ -342,6 +354,13 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}
override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
case _: local.LocalFs | _: local.RawLocalFs =>
// LocalFs = RawLocalFs + ChecksumFs
true
case _ => false
}
}
/**
@ -398,5 +417,12 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}
override def isLocalFileSystem: Boolean = fs match {
case _: LocalFileSystem | _: RawLocalFileSystem =>
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
true
case _ => false
}
}
}