[SPARK-35027][CORE] Close the inputStream in FileAppender when writin…

### What changes were proposed in this pull request?

1. add "closeStreams" to FileAppender and RollingFileAppender
2. set "closeStreams" to "true" in ExecutorRunner

### Why are the changes needed?

The executor will hang when due disk full or other exceptions which happened in writting to outputStream: the root cause is the "inputStream" is not closed after the error happens:
1. ExecutorRunner creates two files appenders for pipe: one for stdout, one for stderr
2. FileAppender.appendStreamToFile exits the loop when writing to outputStream
3. FileAppender closes the outputStream, but left the inputStream which refers the pipe's stdout and stderr opened
4. The executor will hang when printing the log message if the pipe is full (no one consume the outputs)
5. From the driver side, you can see the task can't be completed for ever

With this fix, the step 4 will throw an exception, the driver can catch up the exception and reschedule the failed task to other executors.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add new tests for the "closeStreams" in FileAppenderSuite

Closes #33263 from jhu-chang/SPARK-35027.

Authored-by: Jie <gt.hu.chang@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Jie 2021-07-20 21:23:51 -05:00 committed by Sean Owen
parent 0eb31a06d6
commit 1a8c6755a1
4 changed files with 68 additions and 14 deletions

View file

@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner(
// Redirect its stdout and stderr to files // Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout") val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf) stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)
val stderr = new File(executorDir, "stderr") val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8) Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf) stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
state = ExecutorState.RUNNING state = ExecutorState.RUNNING
worker.send(ExecutorStateChanged(appId, execId, state, None, None)) worker.send(ExecutorStateChanged(appId, execId, state, None, None))

View file

@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
/** /**
* Continuously appends the data from an input stream into the given file. * Continuously appends the data from an input stream into the given file.
*/ */
private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192) private[spark] class FileAppender(
extends Logging { inputStream: InputStream,
file: File,
bufferSize: Int = 8192,
closeStreams: Boolean = false
) extends Logging {
@volatile private var outputStream: FileOutputStream = null @volatile private var outputStream: FileOutputStream = null
@volatile private var markedForStop = false // has the appender been asked to stopped @volatile private var markedForStop = false // has the appender been asked to stopped
@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
} }
} }
} { } {
closeFile() try {
if (closeStreams) {
inputStream.close()
}
} finally {
closeFile()
}
} }
} catch { } catch {
case e: Exception => case e: Exception =>
@ -113,7 +123,12 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
private[spark] object FileAppender extends Logging { private[spark] object FileAppender extends Logging {
/** Create the right appender based on Spark configuration */ /** Create the right appender based on Spark configuration */
def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = { def apply(
inputStream: InputStream,
file: File,
conf: SparkConf,
closeStreams: Boolean = false
) : FileAppender = {
val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY) val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE) val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
@ -141,9 +156,10 @@ private[spark] object FileAppender extends Logging {
validatedParams.map { validatedParams.map {
case (interval, pattern) => case (interval, pattern) =>
new RollingFileAppender( new RollingFileAppender(
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf) inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf,
closeStreams = closeStreams)
}.getOrElse { }.getOrElse {
new FileAppender(inputStream, file) new FileAppender(inputStream, file, closeStreams = closeStreams)
} }
} }
@ -151,17 +167,18 @@ private[spark] object FileAppender extends Logging {
rollingSizeBytes match { rollingSizeBytes match {
case IntParam(bytes) => case IntParam(bytes) =>
logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes") logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf) new RollingFileAppender(
inputStream, file, new SizeBasedRollingPolicy(bytes), conf, closeStreams = closeStreams)
case _ => case _ =>
logWarning( logWarning(
s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled") s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
new FileAppender(inputStream, file) new FileAppender(inputStream, file, closeStreams = closeStreams)
} }
} }
rollingStrategy match { rollingStrategy match {
case "" => case "" =>
new FileAppender(inputStream, file) new FileAppender(inputStream, file, closeStreams = closeStreams)
case "time" => case "time" =>
createTimeBasedAppender() createTimeBasedAppender()
case "size" => case "size" =>
@ -170,7 +187,7 @@ private[spark] object FileAppender extends Logging {
logWarning( logWarning(
s"Illegal strategy [$rollingStrategy] for rolling executor logs, " + s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
s"rolling logs not enabled") s"rolling logs not enabled")
new FileAppender(inputStream, file) new FileAppender(inputStream, file, closeStreams = closeStreams)
} }
} }
} }

View file

@ -36,14 +36,16 @@ import org.apache.spark.internal.config
* @param rollingPolicy Policy based on which files will be rolled over. * @param rollingPolicy Policy based on which files will be rolled over.
* @param conf SparkConf that is used to pass on extra configurations * @param conf SparkConf that is used to pass on extra configurations
* @param bufferSize Optional buffer size. Used mainly for testing. * @param bufferSize Optional buffer size. Used mainly for testing.
* @param closeStreams Option flag: whether to close the inputStream at the end.
*/ */
private[spark] class RollingFileAppender( private[spark] class RollingFileAppender(
inputStream: InputStream, inputStream: InputStream,
activeFile: File, activeFile: File,
val rollingPolicy: RollingPolicy, val rollingPolicy: RollingPolicy,
conf: SparkConf, conf: SparkConf,
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE,
) extends FileAppender(inputStream, activeFile, bufferSize) { closeStreams: Boolean = false
) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) {
private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES) private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION) private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)

View file

@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString) assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString)
} }
test("SPARK-35027: basic file appender - close stream") {
val inputStream = mock(classOf[InputStream])
val appender = new FileAppender(inputStream, testFile, closeStreams = true)
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}
test("rolling file appender - time-based rolling") { test("rolling file appender - time-based rolling") {
// setup input stream and appender // setup input stream and appender
val testOutputStream = new PipedOutputStream() val testOutputStream = new PipedOutputStream()
@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true) appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
} }
test("SPARK-35027: rolling file appender - time-based rolling close stream") {
val inputStream = mock(classOf[InputStream])
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time")
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
assert(
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy])
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}
test("SPARK-35027: rolling file appender - size-based rolling close stream") {
val inputStream = mock(classOf[InputStream])
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size")
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
assert(
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy])
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}
test("rolling file appender - size-based rolling") { test("rolling file appender - size-based rolling") {
// setup input stream and appender // setup input stream and appender
val testOutputStream = new PipedOutputStream() val testOutputStream = new PipedOutputStream()