diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index ea9b3ce4e3..c20fab83c3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -18,13 +18,12 @@ package org.apache.spark.network.buffer; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; import com.google.common.io.ByteStreams; @@ -94,9 +93,9 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { @Override public InputStream createInputStream() throws IOException { - InputStream is = null; + FileInputStream is = null; try { - is = Files.newInputStream(file.toPath()); + is = new FileInputStream(file); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { @@ -133,7 +132,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + FileChannel fileChannel = new FileInputStream(file).getChannel(); return new DefaultFileRegion(fileChannel, offset, length); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 3f2f20b414..9cac7d00cc 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -18,11 +18,11 @@ package org.apache.spark.network.shuffle; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.nio.file.Files; import java.util.Arrays; import org.slf4j.Logger; @@ -165,7 +165,7 @@ public class OneForOneBlockFetcher { DownloadCallback(int chunkIndex) throws IOException { this.targetFile = tempFileManager.createTempFile(); - this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); this.chunkIndex = chunkIndex; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index 386738ece5..eacf485344 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle; import java.io.DataInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; -import java.nio.file.Files; /** * Keeps the index information for a particular map output @@ -39,7 +39,7 @@ public class ShuffleIndexInformation { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); + dis = new DataInputStream(new FileInputStream(indexFile)); dis.readFully(buffer.array()); } finally { if (dis != null) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a9b5236ab8..323a5d3c52 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -18,9 +18,9 @@ package org.apache.spark.shuffle.sort; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; -import java.nio.channels.FileChannel; -import static java.nio.file.StandardOpenOption.*; import javax.annotation.Nullable; import scala.None$; @@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class); private final int fileBufferSize; + private final boolean transferToEnabled; private final int numPartitions; private final BlockManager blockManager; private final Partitioner partitioner; @@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { SparkConf conf) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; + this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; @@ -186,21 +188,17 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { return lengths; } - // This file needs to opened in append mode in order to work around a Linux kernel bug that - // affects transferTo; see SPARK-3948 for more details. - final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE); + final FileOutputStream out = new FileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { - final FileChannel in = FileChannel.open(file.toPath(), READ); + final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { - long size = in.size(); - Utils.copyFileStreamNIO(in, out, 0, size); - lengths[i] = size; + lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index e9c2a69c47..4839d04522 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort; import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; -import static java.nio.file.StandardOpenOption.*; import java.util.Iterator; import scala.Option; @@ -291,7 +290,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); try { if (spills.length == 0) { - java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file + new FileOutputStream(outputFile).close(); // Create an empty file return new long[partitioner.numPartitions()]; } else if (spills.length == 1) { // Here, we don't need to perform any metrics updates because the bytes written to this @@ -368,7 +367,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { final InputStream[] spillInputStreams = new InputStream[spills.length]; final OutputStream bos = new BufferedOutputStream( - java.nio.file.Files.newOutputStream(outputFile.toPath()), + new FileOutputStream(outputFile), outputBufferSizeInBytes); // Use a counting output stream to avoid having to close the underlying file and ask // the file system for its size after each partition is written. @@ -443,11 +442,11 @@ public class UnsafeShuffleWriter extends ShuffleWriter { boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { - spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ); + spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } // This file needs to opened in append mode in order to work around a Linux kernel bug that // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND); + mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 94a3a78e94..1554048517 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import java.io._ -import java.nio.file.Files import com.google.common.io.ByteStreams @@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver( val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { - val out = new DataOutputStream( - new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath))) + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L @@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) + val in = new DataInputStream(new FileInputStream(indexFile)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 6f5b5bb365..375f4a6921 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,8 +18,6 @@ package org.apache.spark.util.collection import java.io._ -import java.nio.channels.{Channels, FileChannel} -import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.BufferedIterator @@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C]( ) private var batchIndex = 0 // Which batch we're in - private var fileChannel: FileChannel = null + private var fileStream: FileInputStream = null // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams @@ -478,14 +476,14 @@ class ExternalAppendOnlyMap[K, V, C]( if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileChannel.close() + fileStream.close() deserializeStream = null - fileChannel = null + fileStream = null } val start = batchOffsets(batchIndex) - fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ) - fileChannel.position(start) + fileStream = new FileInputStream(file) + fileStream.getChannel.position(start) batchIndex += 1 val end = batchOffsets(batchIndex) @@ -493,8 +491,7 @@ class ExternalAppendOnlyMap[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream( - ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream) ser.deserializeStream(wrappedStream) } else { @@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C]( ds.close() deserializeStream = null } - if (fileChannel != null) { - fileChannel.close() - fileChannel = null + if (fileStream != null) { + fileStream.close() + fileStream = null } if (file.exists()) { if (!file.delete()) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 3593cfd507..176f84fa2a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -18,8 +18,6 @@ package org.apache.spark.util.collection import java.io._ -import java.nio.channels.{Channels, FileChannel} -import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.mutable @@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C]( // Intermediate file and deserializer streams that read from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - var fileChannel: FileChannel = null + var fileStream: FileInputStream = null var deserializeStream = nextBatchStream() // Also sets fileStream var nextItem: (K, C) = null @@ -507,14 +505,14 @@ private[spark] class ExternalSorter[K, V, C]( if (batchId < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileChannel.close() + fileStream.close() deserializeStream = null - fileChannel = null + fileStream = null } val start = batchOffsets(batchId) - fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ) - fileChannel.position(start) + fileStream = new FileInputStream(spill.file) + fileStream.getChannel.position(start) batchId += 1 val end = batchOffsets(batchId) @@ -522,8 +520,7 @@ private[spark] class ExternalSorter[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream( - ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream) serInstance.deserializeStream(wrappedStream) @@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C]( batchId = batchOffsets.length // Prevent reading any other batch val ds = deserializeStream deserializeStream = null - fileChannel = null + fileStream = null if (ds != null) { ds.close() }