[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths

## What changes were proposed in this pull request?

Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even this file input/output stream is closed correctly and promptly, it will still leave some memory footprints which will only get cleaned in Full GC. This will introduce two side effects:

1. Lots of memory footprints regarding to Finalizer will be kept in memory and this will increase the memory overhead. In our use case of external shuffle service, a busy shuffle service will have bunch of this object and potentially lead to OOM.
2. The Finalizer will only be called in Full GC, and this will increase the overhead of Full GC and lead to long GC pause.

https://bugs.openjdk.java.net/browse/JDK-8080225

https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful

So to fix this potential issue, here propose to use NIO's Files#newInput/OutputStream instead in some critical paths like shuffle.

Left unchanged FileInputStream in core which I think is not so critical:

```
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467:    val file = new DataInputStream(new FileInputStream(filename))
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942:    val in = new FileInputStream(new File(path))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76:    val fileIn = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248:        val fis = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910:                input = new FileInputStream(new File(t))
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132:        case Some(f) => new FileInputStream(f)
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77:        val fis = new FileInputStream(f)
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import org.apache.spark.io.NioBufferedFileInputStream
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94:      new DataInputStream(new NioBufferedFileInputStream(index))
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111:        val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219:    val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:106:      val in = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89:        inputStream = new FileInputStream(activeFile)
./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:332:        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:1533:      gzInputStream = new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1560:      new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1562:      new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/Utils.scala:2090:    val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
```

Left unchanged FileOutputStream in core:

```
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957:    val out = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import java.io.{DataOutputStream, File, FileOutputStream, IOException}
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131:      val dos = new DataOutputStream(new FileOutputStream(f))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62:    val fileOut = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160:          val outStream = new FileOutputStream(outPath)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239:    val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949:        val out = new FileOutputStream(tempFile)
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106:    val out = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109:     * Therefore, for local files, use FileOutputStream instead. */
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112:        new FileOutputStream(uri.getPath)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71:  private var fos: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102:    fos = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213:      var truncateStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215:        truncateStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153:    val out = new FileOutputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:81:    val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
./core/src/main/scala/org/apache/spark/TestUtils.scala:96:    val jarFileStream = new FileOutputStream(jarFile)
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31:  volatile private var outputStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97:    outputStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90:        gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:333:        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:527:      val out = new FileOutputStream(tempFile)
```

Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy to change to NIO Files API.

For the `FileInputStream` and `FileOutputStream` in common/shuffle* I changed them all.

## How was this patch tested?

Existing tests and manual verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #18684 from jerryshao/SPARK-21475.
This commit is contained in:
jerryshao 2017-08-01 10:23:45 +01:00 committed by Sean Owen
parent 110695db70
commit 5fd0294ff8
8 changed files with 49 additions and 37 deletions

View file

@ -18,12 +18,13 @@
package org.apache.spark.network.buffer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import com.google.common.base.Objects;
import com.google.common.io.ByteStreams;
@ -93,9 +94,9 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
@Override
public InputStream createInputStream() throws IOException {
FileInputStream is = null;
InputStream is = null;
try {
is = new FileInputStream(file);
is = Files.newInputStream(file.toPath());
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
} catch (IOException e) {
@ -132,7 +133,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
return new DefaultFileRegion(fileChannel, offset, length);
}
}

View file

@ -18,11 +18,11 @@
package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.util.Arrays;
import org.slf4j.Logger;
@ -165,7 +165,7 @@ public class OneForOneBlockFetcher {
DownloadCallback(int chunkIndex) throws IOException {
this.targetFile = tempShuffleFileManager.createTempShuffleFile();
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
this.chunkIndex = chunkIndex;
}

View file

@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.nio.file.Files;
/**
* Keeps the index information for a particular map output
@ -38,7 +38,7 @@ public class ShuffleIndexInformation {
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
try {
dis = new DataInputStream(new FileInputStream(indexFile));
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
dis.readFully(buffer.array());
} finally {
if (dis != null) {

View file

@ -18,9 +18,9 @@
package org.apache.spark.shuffle.sort;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import javax.annotation.Nullable;
import scala.None$;
@ -75,7 +75,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
@ -107,7 +106,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
@ -188,17 +186,21 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
return lengths;
}
final FileOutputStream out = new FileOutputStream(outputFile, true);
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
final FileChannel in = FileChannel.open(file.toPath(), READ);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
long size = in.size();
Utils.copyFileStreamNIO(in, out, 0, size);
lengths[i] = size;
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);

View file

@ -20,6 +20,7 @@ package org.apache.spark.shuffle.sort;
import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import java.util.Iterator;
import scala.Option;
@ -290,7 +291,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // Create an empty file
java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// Here, we don't need to perform any metrics updates because the bytes written to this
@ -367,7 +368,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final InputStream[] spillInputStreams = new InputStream[spills.length];
final OutputStream bos = new BufferedOutputStream(
new FileOutputStream(outputFile),
java.nio.file.Files.newOutputStream(outputFile.toPath()),
outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
@ -442,11 +443,11 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
boolean threwException = true;
try {
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);
long bytesWrittenToMergedFile = 0;
for (int partition = 0; partition < numPartitions; partition++) {

View file

@ -18,6 +18,7 @@
package org.apache.spark.shuffle
import java.io._
import java.nio.file.Files
import com.google.common.io.ByteStreams
@ -141,7 +142,8 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
val out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
@ -196,7 +198,7 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val in = new DataInputStream(new FileInputStream(indexFile))
val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()

View file

@ -18,6 +18,8 @@
package org.apache.spark.util.collection
import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator
import scala.collection.BufferedIterator
@ -460,7 +462,7 @@ class ExternalAppendOnlyMap[K, V, C](
)
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
private var fileChannel: FileChannel = null
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
@ -477,14 +479,14 @@ class ExternalAppendOnlyMap[K, V, C](
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileStream.close()
fileChannel.close()
deserializeStream = null
fileStream = null
fileChannel = null
}
val start = batchOffsets(batchIndex)
fileStream = new FileInputStream(file)
fileStream.getChannel.position(start)
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
batchIndex += 1
val end = batchOffsets(batchIndex)
@ -492,7 +494,8 @@ class ExternalAppendOnlyMap[K, V, C](
assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
ser.deserializeStream(wrappedStream)
} else {
@ -552,9 +555,9 @@ class ExternalAppendOnlyMap[K, V, C](
ds.close()
deserializeStream = null
}
if (fileStream != null) {
fileStream.close()
fileStream = null
if (fileChannel != null) {
fileChannel.close()
fileChannel = null
}
if (file.exists()) {
if (!file.delete()) {

View file

@ -18,6 +18,8 @@
package org.apache.spark.util.collection
import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator
import scala.collection.mutable
@ -492,7 +494,7 @@ private[spark] class ExternalSorter[K, V, C](
// Intermediate file and deserializer streams that read from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
var fileStream: FileInputStream = null
var fileChannel: FileChannel = null
var deserializeStream = nextBatchStream() // Also sets fileStream
var nextItem: (K, C) = null
@ -505,14 +507,14 @@ private[spark] class ExternalSorter[K, V, C](
if (batchId < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileStream.close()
fileChannel.close()
deserializeStream = null
fileStream = null
fileChannel = null
}
val start = batchOffsets(batchId)
fileStream = new FileInputStream(spill.file)
fileStream.getChannel.position(start)
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
batchId += 1
val end = batchOffsets(batchId)
@ -520,7 +522,8 @@ private[spark] class ExternalSorter[K, V, C](
assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
serInstance.deserializeStream(wrappedStream)
@ -610,7 +613,7 @@ private[spark] class ExternalSorter[K, V, C](
batchId = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
fileChannel = null
if (ds != null) {
ds.close()
}