diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2e26ccf671..974c2d670c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner( // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") - stdoutAppender = FileAppender(process.getInputStream, stdout, conf) + stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, StandardCharsets.UTF_8) - stderrAppender = FileAppender(process.getErrorStream, stderr, conf) + stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true) state = ExecutorState.RUNNING worker.send(ExecutorStateChanged(appId, execId, state, None, None)) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 7107be25eb..2243239dce 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils} /** * Continuously appends the data from an input stream into the given file. */ -private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192) - extends Logging { +private[spark] class FileAppender( + inputStream: InputStream, + file: File, + bufferSize: Int = 8192, + closeStreams: Boolean = false +) extends Logging { @volatile private var outputStream: FileOutputStream = null @volatile private var markedForStop = false // has the appender been asked to stopped @@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi } } } { - closeFile() + try { + if (closeStreams) { + inputStream.close() + } + } finally { + closeFile() + } } } catch { case e: Exception => @@ -113,7 +123,12 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi private[spark] object FileAppender extends Logging { /** Create the right appender based on Spark configuration */ - def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = { + def apply( + inputStream: InputStream, + file: File, + conf: SparkConf, + closeStreams: Boolean = false + ) : FileAppender = { val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY) val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE) @@ -141,9 +156,10 @@ private[spark] object FileAppender extends Logging { validatedParams.map { case (interval, pattern) => new RollingFileAppender( - inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf) + inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf, + closeStreams = closeStreams) }.getOrElse { - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) } } @@ -151,17 +167,18 @@ private[spark] object FileAppender extends Logging { rollingSizeBytes match { case IntParam(bytes) => logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes") - new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf) + new RollingFileAppender( + inputStream, file, new SizeBasedRollingPolicy(bytes), conf, closeStreams = closeStreams) case _ => logWarning( s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled") - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) } } rollingStrategy match { case "" => - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) case "time" => createTimeBasedAppender() case "size" => @@ -170,7 +187,7 @@ private[spark] object FileAppender extends Logging { logWarning( s"Illegal strategy [$rollingStrategy] for rolling executor logs, " + s"rolling logs not enabled") - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) } } } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index b73f422649..68a59232c7 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -36,14 +36,16 @@ import org.apache.spark.internal.config * @param rollingPolicy Policy based on which files will be rolled over. * @param conf SparkConf that is used to pass on extra configurations * @param bufferSize Optional buffer size. Used mainly for testing. + * @param closeStreams Option flag: whether to close the inputStream at the end. */ private[spark] class RollingFileAppender( inputStream: InputStream, activeFile: File, val rollingPolicy: RollingPolicy, conf: SparkConf, - bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE - ) extends FileAppender(inputStream, activeFile, bufferSize) { + bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE, + closeStreams: Boolean = false + ) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) { private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES) private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 12d97573ff..71010a10cb 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString) } + test("SPARK-35027: basic file appender - close stream") { + val inputStream = mock(classOf[InputStream]) + val appender = new FileAppender(inputStream, testFile, closeStreams = true) + Thread.sleep(10) + appender.stop() + appender.awaitTermination() + verify(inputStream).close() + } + test("rolling file appender - time-based rolling") { // setup input stream and appender val testOutputStream = new PipedOutputStream() @@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true) } + test("SPARK-35027: rolling file appender - time-based rolling close stream") { + val inputStream = mock(classOf[InputStream]) + val sparkConf = new SparkConf() + sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time") + val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true) + assert( + appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) + Thread.sleep(10) + appender.stop() + appender.awaitTermination() + verify(inputStream).close() + } + + test("SPARK-35027: rolling file appender - size-based rolling close stream") { + val inputStream = mock(classOf[InputStream]) + val sparkConf = new SparkConf() + sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size") + val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true) + assert( + appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy]) + Thread.sleep(10) + appender.stop() + appender.awaitTermination() + verify(inputStream).close() + } + test("rolling file appender - size-based rolling") { // setup input stream and appender val testOutputStream = new PipedOutputStream()