[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths"

## What changes were proposed in this pull request?

This reverts commit 5fd0294ff8 because of a huge performance regression.
I manually fixed a minor conflict in `OneForOneBlockFetcher.java`.

`Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. `ChannelInputStream` doesn't override `InputStream.skip`, so it's using the default `InputStream.skip` which just consumes and discards data. This causes a huge performance regression when reading shuffle files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20119 from zsxwing/revert-SPARK-21475.
This commit is contained in:
Shixiong Zhu 2017-12-29 22:33:29 -08:00
parent f2b3525c17
commit 14c4a62c12
8 changed files with 37 additions and 49 deletions

View file

@ -18,13 +18,12 @@
package org.apache.spark.network.buffer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import com.google.common.base.Objects;
import com.google.common.io.ByteStreams;
@ -94,9 +93,9 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
@Override
public InputStream createInputStream() throws IOException {
InputStream is = null;
FileInputStream is = null;
try {
is = Files.newInputStream(file.toPath());
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
} catch (IOException e) {
@ -133,7 +132,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}

View file

@ -18,11 +18,11 @@
package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.util.Arrays;
import org.slf4j.Logger;
@ -165,7 +165,7 @@ public class OneForOneBlockFetcher {
DownloadCallback(int chunkIndex) throws IOException {
this.targetFile = tempFileManager.createTempFile();
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
this.chunkIndex = chunkIndex;
}

View file

@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.nio.file.Files;
/**
* Keeps the index information for a particular map output
@ -39,7 +39,7 @@ public class ShuffleIndexInformation {
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
try {
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
dis = new DataInputStream(new FileInputStream(indexFile));
dis.readFully(buffer.array());
} finally {
if (dis != null) {

View file

@ -18,9 +18,9 @@
package org.apache.spark.shuffle.sort;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import javax.annotation.Nullable;
import scala.None$;
@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
@ -186,21 +188,17 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
return lengths;
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileChannel in = FileChannel.open(file.toPath(), READ);
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
long size = in.size();
Utils.copyFileStreamNIO(in, out, 0, size);
lengths[i] = size;
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);

View file

@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort;
import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import java.util.Iterator;
import scala.Option;
@ -291,7 +290,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
try {
if (spills.length == 0) {
java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file
new FileOutputStream(outputFile).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// Here, we don't need to perform any metrics updates because the bytes written to this
@ -368,7 +367,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final InputStream[] spillInputStreams = new InputStream[spills.length];
final OutputStream bos = new BufferedOutputStream(
java.nio.file.Files.newOutputStream(outputFile.toPath()),
new FileOutputStream(outputFile),
outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
@ -443,11 +442,11 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
boolean threwException = true;
try {
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
long bytesWrittenToMergedFile = 0;
for (int partition = 0; partition < numPartitions; partition++) {

View file

@ -18,7 +18,6 @@
package org.apache.spark.shuffle
import java.io._
import java.nio.file.Files
import com.google.common.io.ByteStreams
@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
val in = new DataInputStream(new FileInputStream(indexFile))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()

View file

@ -18,8 +18,6 @@
package org.apache.spark.util.collection
import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator
import scala.collection.BufferedIterator
@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C](
)
private var batchIndex = 0 // Which batch we're in
private var fileChannel: FileChannel = null
private var fileStream: FileInputStream = null
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
@ -478,14 +476,14 @@ class ExternalAppendOnlyMap[K, V, C](
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileChannel.close()
fileStream.close()
deserializeStream = null
fileChannel = null
fileStream = null
}
val start = batchOffsets(batchIndex)
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
fileStream = new FileInputStream(file)
fileStream.getChannel.position(start)
batchIndex += 1
val end = batchOffsets(batchIndex)
@ -493,8 +491,7 @@ class ExternalAppendOnlyMap[K, V, C](
assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
ser.deserializeStream(wrappedStream)
} else {
@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C](
ds.close()
deserializeStream = null
}
if (fileChannel != null) {
fileChannel.close()
fileChannel = null
if (fileStream != null) {
fileStream.close()
fileStream = null
}
if (file.exists()) {
if (!file.delete()) {

View file

@ -18,8 +18,6 @@
package org.apache.spark.util.collection
import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator
import scala.collection.mutable
@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C](
// Intermediate file and deserializer streams that read from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
var fileChannel: FileChannel = null
var fileStream: FileInputStream = null
var deserializeStream = nextBatchStream() // Also sets fileStream
var nextItem: (K, C) = null
@ -507,14 +505,14 @@ private[spark] class ExternalSorter[K, V, C](
if (batchId < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileChannel.close()
fileStream.close()
deserializeStream = null
fileChannel = null
fileStream = null
}
val start = batchOffsets(batchId)
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
fileStream = new FileInputStream(spill.file)
fileStream.getChannel.position(start)
batchId += 1
val end = batchOffsets(batchId)
@ -522,8 +520,7 @@ private[spark] class ExternalSorter[K, V, C](
assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
serInstance.deserializeStream(wrappedStream)
@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C](
batchId = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileChannel = null
fileStream = null
if (ds != null) {
ds.close()
}